| from lib import StringIO, Packer |
| from lib import Container, ListContainer, AttrDict, LazyContainer |
| |
| |
| #=============================================================================== |
| # exceptions |
| #=============================================================================== |
| class ConstructError(Exception): |
| __slots__ = [] |
| class FieldError(ConstructError): |
| __slots__ = [] |
| class SizeofError(ConstructError): |
| __slots__ = [] |
| class AdaptationError(ConstructError): |
| __slots__ = [] |
| class ArrayError(ConstructError): |
| __slots__ = [] |
| class RangeError(ConstructError): |
| __slots__ = [] |
| class SwitchError(ConstructError): |
| __slots__ = [] |
| class SelectError(ConstructError): |
| __slots__ = [] |
| class TerminatorError(ConstructError): |
| __slots__ = [] |
| |
| #=============================================================================== |
| # abstract constructs |
| #=============================================================================== |
| class Construct(object): |
| """ |
| The mother of all constructs! |
| |
| User API: |
| * parse(buf) - parses an in-memory buffer (usually a string) |
| * parse_stream(stream) - parses a stream (in-memory, file, pipe, ...) |
| * build(obj) - builds the object into an in-memory buffer (a string) |
| * build_stream(obj, stream) - builds the object into the given stream |
| * sizeof(context) - calculates the size of the construct, if possible, |
| based on the context |
| |
| Overriable methods for subclassing: |
| * _parse(stream, context) - low-level parse from stream |
| * _build(obj, stream, context) - low-level build to stream |
| * _sizeof(context) - low-level compute size |
| |
| Flags API: |
| * _set_flag(flag) - sets the given flag/flags |
| * _clear_flag(flag) - clears the given flag/flags |
| * _inherit_flags(*subcons) - inherits the flag of subcons |
| * _is_flag(flag) - is the flag set? (predicate) |
| |
| Overridable methods for the copy-API: |
| * __getstate__() - returns a dict of the attributes of self |
| * __setstate__(attrs) - sets the attrs to self |
| |
| Attributes: |
| All constructs have a name and flags. The name is used for naming |
| struct-members and context dicts. Note that the name must be a string or |
| None (if the name is not needed). A single underscore ("_") is a reserved |
| name, and so are names starting with a less-than character ("<"). The name |
| should be descriptive, short, and valid as a python identifier (although |
| these rules are not enforced). |
| |
| The flags specify additional behavioral information about this construct. |
| The flags are used by enclosing constructs to determine a proper course |
| of action. Usually, flags are "inherited", i.e., an enclosing construct |
| inherits the flags of its subconstruct. The enclosing construct may |
| set new flags or clear existing ones, as necessary. |
| |
| For example, if FLAG_COPY_CONTEXT is set, repeaters will pass a copy of |
| the context for each iteration, which is necessary for OnDemand parsing. |
| """ |
| FLAG_COPY_CONTEXT = 0x0001 |
| FLAG_DYNAMIC = 0x0002 |
| FLAG_EMBED = 0x0004 |
| FLAG_NESTING = 0x0008 |
| |
| __slots__ = ["name", "conflags"] |
| def __init__(self, name, flags = 0): |
| if name is not None: |
| if type(name) is not str: |
| raise TypeError("name must be a string or None", name) |
| if name == "_" or name.startswith("<"): |
| raise ValueError("reserved name", name) |
| self.name = name |
| self.conflags = flags |
| def __repr__(self): |
| return "%s(%r)" % (self.__class__.__name__, self.name) |
| |
| def _set_flag(self, flag): |
| self.conflags |= flag |
| def _clear_flag(self, flag): |
| self.conflags &= ~flag |
| def _inherit_flags(self, *subcons): |
| for sc in subcons: |
| self._set_flag(sc.conflags) |
| def _is_flag(self, flag): |
| return bool(self.conflags & flag) |
| |
| def __getstate__(self): |
| attrs = {} |
| if hasattr(self, "__dict__"): |
| attrs.update(self.__dict__) |
| slots = [] |
| c = self.__class__ |
| while c is not None: |
| if hasattr(c, "__slots__"): |
| slots.extend(c.__slots__) |
| c = c.__base__ |
| for name in slots: |
| if hasattr(self, name): |
| attrs[name] = getattr(self, name) |
| return attrs |
| def __setstate__(self, attrs): |
| for name, value in attrs.iteritems(): |
| setattr(self, name, value) |
| def __copy__(self): |
| """returns a copy of this construct""" |
| self2 = object.__new__(self.__class__) |
| self2.__setstate__(self.__getstate__()) |
| return self2 |
| |
| def parse(self, data): |
| """parses data given as a buffer or a string (in-memory)""" |
| return self.parse_stream(StringIO(data)) |
| def parse_stream(self, stream): |
| """parses data read directly from a stream""" |
| return self._parse(stream, AttrDict()) |
| def _parse(self, stream, context): |
| raise NotImplementedError() |
| |
| def build(self, obj): |
| """builds an object in a string (in memory)""" |
| stream = StringIO() |
| self.build_stream(obj, stream) |
| return stream.getvalue() |
| def build_stream(self, obj, stream): |
| """builds an object into a stream""" |
| self._build(obj, stream, AttrDict()) |
| def _build(self, obj, stream, context): |
| raise NotImplementedError() |
| |
| def sizeof(self, context = None): |
| """calculates the size of the construct (if possible) using the |
| given context""" |
| if context is None: |
| context = AttrDict() |
| return self._sizeof(context) |
| def _sizeof(self, context): |
| raise SizeofError("can't calculate size") |
| |
| class Subconstruct(Construct): |
| """ |
| Abstract subconstruct (wraps an inner construct, inheriting it's |
| name and flags). |
| |
| Parameters: |
| * subcon - the construct to wrap |
| """ |
| __slots__ = ["subcon"] |
| def __init__(self, subcon): |
| Construct.__init__(self, subcon.name, subcon.conflags) |
| self.subcon = subcon |
| def _parse(self, stream, context): |
| return self.subcon._parse(stream, context) |
| def _build(self, obj, stream, context): |
| self.subcon._build(obj, stream, context) |
| def _sizeof(self, context): |
| return self.subcon._sizeof(context) |
| |
| class Adapter(Subconstruct): |
| """ |
| Abstract adapter: calls _decode for parsing and _encode for building. |
| |
| Parameters: |
| * subcon - the construct to wrap |
| """ |
| __slots__ = [] |
| def _parse(self, stream, context): |
| return self._decode(self.subcon._parse(stream, context), context) |
| def _build(self, obj, stream, context): |
| self.subcon._build(self._encode(obj, context), stream, context) |
| def _decode(self, obj, context): |
| raise NotImplementedError() |
| def _encode(self, obj, context): |
| raise NotImplementedError() |
| |
| |
| #=============================================================================== |
| # primitives |
| #=============================================================================== |
| def _read_stream(stream, length): |
| if length < 0: |
| raise ValueError("length must be >= 0", length) |
| data = stream.read(length) |
| if len(data) != length: |
| raise FieldError("expected %d, found %d" % (length, len(data))) |
| return data |
| |
| def _write_stream(stream, length, data): |
| if length < 0: |
| raise ValueError("length must be >= 0", length) |
| if len(data) != length: |
| raise FieldError("expected %d, found %d" % (length, len(data))) |
| stream.write(data) |
| |
| class StaticField(Construct): |
| """ |
| A field of a fixed size |
| |
| Parameters: |
| * name - the name of the field |
| * length - the length (an integer) |
| |
| Example: |
| StaticField("foo", 5) |
| """ |
| __slots__ = ["length"] |
| def __init__(self, name, length): |
| Construct.__init__(self, name) |
| self.length = length |
| def _parse(self, stream, context): |
| return _read_stream(stream, self.length) |
| def _build(self, obj, stream, context): |
| _write_stream(stream, self.length, obj) |
| def _sizeof(self, context): |
| return self.length |
| |
| class FormatField(StaticField): |
| """ |
| A field that uses python's built-in struct module to pack/unpack data |
| according to a format string. |
| Note: this field has been originally implemented as an Adapter, but it |
| was made a construct for performance reasons. |
| |
| Parameters: |
| * name - the name |
| * endianity - "<" for little endian, ">" for big endian, or "=" for native |
| * format - a single format character |
| |
| Example: |
| FormatField("foo", ">", "L") |
| """ |
| __slots__ = ["packer"] |
| def __init__(self, name, endianity, format): |
| if endianity not in (">", "<", "="): |
| raise ValueError("endianity must be be '=', '<', or '>'", |
| endianity) |
| if len(format) != 1: |
| raise ValueError("must specify one and only one format char") |
| self.packer = Packer(endianity + format) |
| StaticField.__init__(self, name, self.packer.size) |
| def __getstate__(self): |
| attrs = StaticField.__getstate__(self) |
| attrs["packer"] = attrs["packer"].format |
| return attrs |
| def __setstate__(self, attrs): |
| attrs["packer"] = Packer(attrs["packer"]) |
| return StaticField.__setstate__(attrs) |
| def _parse(self, stream, context): |
| try: |
| return self.packer.unpack(_read_stream(stream, self.length))[0] |
| except Exception, ex: |
| raise FieldError(ex) |
| def _build(self, obj, stream, context): |
| try: |
| _write_stream(stream, self.length, self.packer.pack(obj)) |
| except Exception, ex: |
| raise FieldError(ex) |
| |
| class MetaField(Construct): |
| """ |
| A field of a meta-length. The length is computed at runtime based on |
| the context. |
| |
| Parameters: |
| * name - the name of the field |
| * lengthfunc - a function that takes the context as a parameter and return |
| the length of the field |
| |
| Example: |
| MetaField("foo", lambda ctx: 5) |
| """ |
| __slots__ = ["lengthfunc"] |
| def __init__(self, name, lengthfunc): |
| Construct.__init__(self, name) |
| self.lengthfunc = lengthfunc |
| self._set_flag(self.FLAG_DYNAMIC) |
| def _parse(self, stream, context): |
| return _read_stream(stream, self.lengthfunc(context)) |
| def _build(self, obj, stream, context): |
| _write_stream(stream, self.lengthfunc(context), obj) |
| def _sizeof(self, context): |
| return self.lengthfunc(context) |
| |
| |
| #=============================================================================== |
| # arrays and repeaters |
| #=============================================================================== |
| class MetaArray(Subconstruct): |
| """ |
| An array (repeater) of a meta-count. The array will iterate exactly |
| `countfunc()` times. Will raise ArrayError if less elements are found. |
| See also Array, Range and RepeatUntil. |
| |
| Parameters: |
| * countfunc - a function that takes the context as a parameter and returns |
| the number of elements of the array (count) |
| * subcon - the subcon to repeat `countfunc()` times |
| |
| Example: |
| MetaArray(lambda ctx: 5, UBInt8("foo")) |
| """ |
| __slots__ = ["countfunc"] |
| def __init__(self, countfunc, subcon): |
| Subconstruct.__init__(self, subcon) |
| self.countfunc = countfunc |
| self._clear_flag(self.FLAG_COPY_CONTEXT) |
| self._set_flag(self.FLAG_DYNAMIC) |
| def _parse(self, stream, context): |
| obj = ListContainer() |
| c = 0 |
| count = self.countfunc(context) |
| try: |
| if self.subcon.conflags & self.FLAG_COPY_CONTEXT: |
| while c < count: |
| obj.append(self.subcon._parse(stream, context.__copy__())) |
| c += 1 |
| else: |
| while c < count: |
| obj.append(self.subcon._parse(stream, context)) |
| c += 1 |
| except ConstructError, ex: |
| raise ArrayError("expected %d, found %d" % (count, c), ex) |
| return obj |
| def _build(self, obj, stream, context): |
| count = self.countfunc(context) |
| if len(obj) != count: |
| raise ArrayError("expected %d, found %d" % (count, len(obj))) |
| if self.subcon.conflags & self.FLAG_COPY_CONTEXT: |
| for subobj in obj: |
| self.subcon._build(subobj, stream, context.__copy__()) |
| else: |
| for subobj in obj: |
| self.subcon._build(subobj, stream, context) |
| def _sizeof(self, context): |
| return self.subcon._sizeof(context) * self.countfunc(context) |
| |
| class Range(Subconstruct): |
| """ |
| A range-array. The subcon will iterate between `mincount` to `maxcount` |
| times. If less than `mincount` elements are found, raises RangeError. |
| See also GreedyRange and OptionalGreedyRange. |
| |
| Notes: |
| * requires a seekable stream. |
| |
| Parameters: |
| * mincount - the minimal count (an integer) |
| * maxcount - the maximal count (an integer) |
| * subcon - the subcon to repeat |
| |
| Example: |
| Range(5, 8, UBInt8("foo")) |
| """ |
| __slots__ = ["mincount", "maxcout"] |
| def __init__(self, mincount, maxcout, subcon): |
| Subconstruct.__init__(self, subcon) |
| self.mincount = mincount |
| self.maxcout = maxcout |
| self._clear_flag(self.FLAG_COPY_CONTEXT) |
| self._set_flag(self.FLAG_DYNAMIC) |
| def _parse(self, stream, context): |
| obj = ListContainer() |
| c = 0 |
| try: |
| if self.subcon.conflags & self.FLAG_COPY_CONTEXT: |
| while c < self.maxcout: |
| pos = stream.tell() |
| obj.append(self.subcon._parse(stream, context.__copy__())) |
| c += 1 |
| else: |
| while c < self.maxcout: |
| pos = stream.tell() |
| obj.append(self.subcon._parse(stream, context)) |
| c += 1 |
| except ConstructError: |
| if c < self.mincount: |
| raise RangeError("expected %d to %d, found %d" % |
| (self.mincount, self.maxcout, c)) |
| stream.seek(pos) |
| return obj |
| def _build(self, obj, stream, context): |
| if len(obj) < self.mincount or len(obj) > self.maxcout: |
| raise RangeError("expected %d to %d, found %d" % |
| (self.mincount, self.maxcout, len(obj))) |
| cnt = 0 |
| try: |
| if self.subcon.conflags & self.FLAG_COPY_CONTEXT: |
| for subobj in obj: |
| self.subcon._build(subobj, stream, context.__copy__()) |
| cnt += 1 |
| else: |
| for subobj in obj: |
| self.subcon._build(subobj, stream, context) |
| cnt += 1 |
| except ConstructError: |
| if cnt < self.mincount: |
| raise RangeError("expected %d to %d, found %d" % |
| (self.mincount, self.maxcout, len(obj))) |
| def _sizeof(self, context): |
| raise SizeofError("can't calculate size") |
| |
| class RepeatUntil(Subconstruct): |
| """ |
| An array that repeat until the predicate indicates it to stop. Note that |
| the last element (which caused the repeat to exit) is included in the |
| return value. |
| |
| Parameters: |
| * predicate - a predicate function that takes (obj, context) and returns |
| True if the stop-condition is met, or False to continue. |
| * subcon - the subcon to repeat. |
| |
| Example: |
| # will read chars until \x00 (inclusive) |
| RepeatUntil(lambda obj, ctx: obj == "\x00", |
| Field("chars", 1) |
| ) |
| """ |
| __slots__ = ["predicate"] |
| def __init__(self, predicate, subcon): |
| Subconstruct.__init__(self, subcon) |
| self.predicate = predicate |
| self._clear_flag(self.FLAG_COPY_CONTEXT) |
| self._set_flag(self.FLAG_DYNAMIC) |
| def _parse(self, stream, context): |
| obj = [] |
| try: |
| if self.subcon.conflags & self.FLAG_COPY_CONTEXT: |
| while True: |
| subobj = self.subcon._parse(stream, context.__copy__()) |
| obj.append(subobj) |
| if self.predicate(subobj, context): |
| break |
| else: |
| while True: |
| subobj = self.subcon._parse(stream, context) |
| obj.append(subobj) |
| if self.predicate(subobj, context): |
| break |
| except ConstructError, ex: |
| raise ArrayError("missing terminator", ex) |
| return obj |
| def _build(self, obj, stream, context): |
| terminated = False |
| if self.subcon.conflags & self.FLAG_COPY_CONTEXT: |
| for subobj in obj: |
| self.subcon._build(subobj, stream, context.__copy__()) |
| if self.predicate(subobj, context): |
| terminated = True |
| break |
| else: |
| for subobj in obj: |
| self.subcon._build(subobj, stream, context.__copy__()) |
| if self.predicate(subobj, context): |
| terminated = True |
| break |
| if not terminated: |
| raise ArrayError("missing terminator") |
| def _sizeof(self, context): |
| raise SizeofError("can't calculate size") |
| |
| |
| #=============================================================================== |
| # structures and sequences |
| #=============================================================================== |
| class Struct(Construct): |
| """ |
| A sequence of named constructs, similar to structs in C. The elements are |
| parsed and built in the order they are defined. |
| See also Embedded. |
| |
| Parameters: |
| * name - the name of the structure |
| * subcons - a sequence of subconstructs that make up this structure. |
| * nested - a keyword-only argument that indicates whether this struct |
| creates a nested context. The default is True. This parameter is |
| considered "advanced usage", and may be removed in the future. |
| |
| Example: |
| Struct("foo", |
| UBInt8("first_element"), |
| UBInt16("second_element"), |
| Padding(2), |
| UBInt8("third_element"), |
| ) |
| """ |
| __slots__ = ["subcons", "nested"] |
| def __init__(self, name, *subcons, **kw): |
| self.nested = kw.pop("nested", True) |
| if kw: |
| raise TypeError("the only keyword argument accepted is 'nested'", kw) |
| Construct.__init__(self, name) |
| self.subcons = subcons |
| self._inherit_flags(*subcons) |
| self._clear_flag(self.FLAG_EMBED) |
| def _parse(self, stream, context): |
| if "<obj>" in context: |
| obj = context["<obj>"] |
| del context["<obj>"] |
| else: |
| obj = Container() |
| if self.nested: |
| context = AttrDict(_ = context) |
| for sc in self.subcons: |
| if sc.conflags & self.FLAG_EMBED: |
| context["<obj>"] = obj |
| sc._parse(stream, context) |
| else: |
| subobj = sc._parse(stream, context) |
| if sc.name is not None: |
| obj[sc.name] = subobj |
| context[sc.name] = subobj |
| return obj |
| def _build(self, obj, stream, context): |
| if "<unnested>" in context: |
| del context["<unnested>"] |
| elif self.nested: |
| context = AttrDict(_ = context) |
| for sc in self.subcons: |
| if sc.conflags & self.FLAG_EMBED: |
| context["<unnested>"] = True |
| subobj = obj |
| elif sc.name is None: |
| subobj = None |
| else: |
| subobj = getattr(obj, sc.name) |
| context[sc.name] = subobj |
| sc._build(subobj, stream, context) |
| def _sizeof(self, context): |
| if self.nested: |
| context = AttrDict(_ = context) |
| return sum(sc._sizeof(context) for sc in self.subcons) |
| |
| class Sequence(Struct): |
| """ |
| A sequence of unnamed constructs. The elements are parsed and built in the |
| order they are defined. |
| See also Embedded. |
| |
| Parameters: |
| * name - the name of the structure |
| * subcons - a sequence of subconstructs that make up this structure. |
| * nested - a keyword-only argument that indicates whether this struct |
| creates a nested context. The default is True. This parameter is |
| considered "advanced usage", and may be removed in the future. |
| |
| Example: |
| Sequence("foo", |
| UBInt8("first_element"), |
| UBInt16("second_element"), |
| Padding(2), |
| UBInt8("third_element"), |
| ) |
| """ |
| __slots__ = [] |
| def _parse(self, stream, context): |
| if "<obj>" in context: |
| obj = context["<obj>"] |
| del context["<obj>"] |
| else: |
| obj = ListContainer() |
| if self.nested: |
| context = AttrDict(_ = context) |
| for sc in self.subcons: |
| if sc.conflags & self.FLAG_EMBED: |
| context["<obj>"] = obj |
| sc._parse(stream, context) |
| else: |
| subobj = sc._parse(stream, context) |
| if sc.name is not None: |
| obj.append(subobj) |
| context[sc.name] = subobj |
| return obj |
| def _build(self, obj, stream, context): |
| if "<unnested>" in context: |
| del context["<unnested>"] |
| elif self.nested: |
| context = AttrDict(_ = context) |
| objiter = iter(obj) |
| for sc in self.subcons: |
| if sc.conflags & self.FLAG_EMBED: |
| context["<unnested>"] = True |
| subobj = objiter |
| elif sc.name is None: |
| subobj = None |
| else: |
| subobj = objiter.next() |
| context[sc.name] = subobj |
| sc._build(subobj, stream, context) |
| |
| class Union(Construct): |
| """ |
| a set of overlapping fields (like unions in C). when parsing, |
| all fields read the same data; when building, only the first subcon |
| (called "master") is used. |
| |
| Parameters: |
| * name - the name of the union |
| * master - the master subcon, i.e., the subcon used for building and |
| calculating the total size |
| * subcons - additional subcons |
| |
| Example: |
| Union("what_are_four_bytes", |
| UBInt32("one_dword"), |
| Struct("two_words", UBInt16("first"), UBInt16("second")), |
| Struct("four_bytes", |
| UBInt8("a"), |
| UBInt8("b"), |
| UBInt8("c"), |
| UBInt8("d") |
| ), |
| ) |
| """ |
| __slots__ = ["parser", "builder"] |
| def __init__(self, name, master, *subcons, **kw): |
| Construct.__init__(self, name) |
| args = [Peek(sc) for sc in subcons] |
| args.append(MetaField(None, lambda ctx: master._sizeof(ctx))) |
| self.parser = Struct(name, Peek(master, perform_build = True), *args) |
| self.builder = Struct(name, master) |
| def _parse(self, stream, context): |
| return self.parser._parse(stream, context) |
| def _build(self, obj, stream, context): |
| return self.builder._build(obj, stream, context) |
| def _sizeof(self, context): |
| return self.builder._sizeof(context) |
| |
| #=============================================================================== |
| # conditional |
| #=============================================================================== |
| class Switch(Construct): |
| """ |
| A conditional branch. Switch will choose the case to follow based on |
| the return value of keyfunc. If no case is matched, and no default value |
| is given, SwitchError will be raised. |
| See also Pass. |
| |
| Parameters: |
| * name - the name of the construct |
| * keyfunc - a function that takes the context and returns a key, which |
| will ne used to choose the relevant case. |
| * cases - a dictionary mapping keys to constructs. the keys can be any |
| values that may be returned by keyfunc. |
| * default - a default value to use when the key is not found in the cases. |
| if not supplied, an exception will be raised when the key is not found. |
| You can use the builtin construct Pass for 'do-nothing'. |
| * include_key - whether or not to include the key in the return value |
| of parsing. defualt is False. |
| |
| Example: |
| Struct("foo", |
| UBInt8("type"), |
| Switch("value", lambda ctx: ctx.type, { |
| 1 : UBInt8("spam"), |
| 2 : UBInt16("spam"), |
| 3 : UBInt32("spam"), |
| 4 : UBInt64("spam"), |
| } |
| ), |
| ) |
| """ |
| |
| class NoDefault(Construct): |
| def _parse(self, stream, context): |
| raise SwitchError("no default case defined") |
| def _build(self, obj, stream, context): |
| raise SwitchError("no default case defined") |
| def _sizeof(self, context): |
| raise SwitchError("no default case defined") |
| NoDefault = NoDefault("NoDefault") |
| |
| __slots__ = ["subcons", "keyfunc", "cases", "default", "include_key"] |
| |
| def __init__(self, name, keyfunc, cases, default = NoDefault, |
| include_key = False): |
| Construct.__init__(self, name) |
| self._inherit_flags(*cases.values()) |
| self.keyfunc = keyfunc |
| self.cases = cases |
| self.default = default |
| self.include_key = include_key |
| self._inherit_flags(*cases.values()) |
| self._set_flag(self.FLAG_DYNAMIC) |
| def _parse(self, stream, context): |
| key = self.keyfunc(context) |
| obj = self.cases.get(key, self.default)._parse(stream, context) |
| if self.include_key: |
| return key, obj |
| else: |
| return obj |
| def _build(self, obj, stream, context): |
| if self.include_key: |
| key, obj = obj |
| else: |
| key = self.keyfunc(context) |
| case = self.cases.get(key, self.default) |
| case._build(obj, stream, context) |
| def _sizeof(self, context): |
| case = self.cases.get(self.keyfunc(context), self.default) |
| return case._sizeof(context) |
| |
| class Select(Construct): |
| """ |
| Selects the first matching subconstruct. It will literally try each of |
| the subconstructs, until one matches. |
| |
| Notes: |
| * requires a seekable stream. |
| |
| Parameters: |
| * name - the name of the construct |
| * subcons - the subcons to try (order-sensitive) |
| * include_name - a keyword only argument, indicating whether to include |
| the name of the selected subcon in the return value of parsing. default |
| is false. |
| |
| Example: |
| Select("foo", |
| UBInt64("large"), |
| UBInt32("medium"), |
| UBInt16("small"), |
| UBInt8("tiny"), |
| ) |
| """ |
| __slots__ = ["subcons", "include_name"] |
| def __init__(self, name, *subcons, **kw): |
| include_name = kw.pop("include_name", False) |
| if kw: |
| raise TypeError("the only keyword argument accepted " |
| "is 'include_name'", kw) |
| Construct.__init__(self, name) |
| self.subcons = subcons |
| self.include_name = include_name |
| self._inherit_flags(*subcons) |
| self._set_flag(self.FLAG_DYNAMIC) |
| def _parse(self, stream, context): |
| for sc in self.subcons: |
| pos = stream.tell() |
| context2 = context.__copy__() |
| try: |
| obj = sc._parse(stream, context2) |
| except ConstructError: |
| stream.seek(pos) |
| else: |
| context.__update__(context2) |
| if self.include_name: |
| return sc.name, obj |
| else: |
| return obj |
| raise SelectError("no subconstruct matched") |
| def _build(self, obj, stream, context): |
| if self.include_name: |
| name, obj = obj |
| for sc in self.subcons: |
| if sc.name == name: |
| sc._build(obj, stream, context) |
| return |
| else: |
| for sc in self.subcons: |
| stream2 = StringIO() |
| context2 = context.__copy__() |
| try: |
| sc._build(obj, stream2, context2) |
| except Exception: |
| pass |
| else: |
| context.__update__(context2) |
| stream.write(stream2.getvalue()) |
| return |
| raise SelectError("no subconstruct matched", obj) |
| def _sizeof(self, context): |
| raise SizeofError("can't calculate size") |
| |
| |
| #=============================================================================== |
| # stream manipulation |
| #=============================================================================== |
| class Pointer(Subconstruct): |
| """ |
| Changes the stream position to a given offset, where the construction |
| should take place, and restores the stream position when finished. |
| See also Anchor, OnDemand and OnDemandPointer. |
| |
| Notes: |
| * requires a seekable stream. |
| |
| Parameters: |
| * offsetfunc: a function that takes the context and returns an absolute |
| stream position, where the construction would take place |
| * subcon - the subcon to use at `offsetfunc()` |
| |
| Example: |
| Struct("foo", |
| UBInt32("spam_pointer"), |
| Pointer(lambda ctx: ctx.spam_pointer, |
| Array(5, UBInt8("spam")) |
| ) |
| ) |
| """ |
| __slots__ = ["offsetfunc"] |
| def __init__(self, offsetfunc, subcon): |
| Subconstruct.__init__(self, subcon) |
| self.offsetfunc = offsetfunc |
| def _parse(self, stream, context): |
| newpos = self.offsetfunc(context) |
| origpos = stream.tell() |
| stream.seek(newpos) |
| obj = self.subcon._parse(stream, context) |
| stream.seek(origpos) |
| return obj |
| def _build(self, obj, stream, context): |
| newpos = self.offsetfunc(context) |
| origpos = stream.tell() |
| stream.seek(newpos) |
| self.subcon._build(obj, stream, context) |
| stream.seek(origpos) |
| def _sizeof(self, context): |
| return 0 |
| |
| class Peek(Subconstruct): |
| """ |
| Peeks at the stream: parses without changing the stream position. |
| See also Union. If the end of the stream is reached when peeking, |
| returns None. |
| |
| Notes: |
| * requires a seekable stream. |
| |
| Parameters: |
| * subcon - the subcon to peek at |
| * perform_build - whether or not to perform building. by default this |
| parameter is set to False, meaning building is a no-op. |
| |
| Example: |
| Peek(UBInt8("foo")) |
| """ |
| __slots__ = ["perform_build"] |
| def __init__(self, subcon, perform_build = False): |
| Subconstruct.__init__(self, subcon) |
| self.perform_build = perform_build |
| def _parse(self, stream, context): |
| pos = stream.tell() |
| try: |
| try: |
| return self.subcon._parse(stream, context) |
| except FieldError: |
| pass |
| finally: |
| stream.seek(pos) |
| def _build(self, obj, stream, context): |
| if self.perform_build: |
| self.subcon._build(obj, stream, context) |
| def _sizeof(self, context): |
| return 0 |
| |
| class OnDemand(Subconstruct): |
| """ |
| Allows for on-demand (lazy) parsing. When parsing, it will return a |
| LazyContainer that represents a pointer to the data, but does not actually |
| parses it from stream until it's "demanded". |
| By accessing the 'value' property of LazyContainers, you will demand the |
| data from the stream. The data will be parsed and cached for later use. |
| You can use the 'has_value' property to know whether the data has already |
| been demanded. |
| See also OnDemandPointer. |
| |
| Notes: |
| * requires a seekable stream. |
| |
| Parameters: |
| * subcon - |
| * advance_stream - whether or not to advance the stream position. by |
| default this is True, but if subcon is a pointer, this should be False. |
| * force_build - whether or not to force build. If set to False, and the |
| LazyContainer has not been demaned, building is a no-op. |
| |
| Example: |
| OnDemand(Array(10000, UBInt8("foo")) |
| """ |
| __slots__ = ["advance_stream", "force_build"] |
| def __init__(self, subcon, advance_stream = True, force_build = True): |
| Subconstruct.__init__(self, subcon) |
| self.advance_stream = advance_stream |
| self.force_build = force_build |
| def _parse(self, stream, context): |
| obj = LazyContainer(self.subcon, stream, stream.tell(), context) |
| if self.advance_stream: |
| stream.seek(self.subcon._sizeof(context), 1) |
| return obj |
| def _build(self, obj, stream, context): |
| if not isinstance(obj, LazyContainer): |
| self.subcon._build(obj, stream, context) |
| elif self.force_build or obj.has_value: |
| self.subcon._build(obj.value, stream, context) |
| elif self.advance_stream: |
| stream.seek(self.subcon._sizeof(context), 1) |
| |
| class Buffered(Subconstruct): |
| """ |
| Creates an in-memory buffered stream, which can undergo encoding and |
| decoding prior to being passed on to the subconstruct. |
| See also Bitwise. |
| |
| Note: |
| * Do not use pointers inside Buffered |
| |
| Parameters: |
| * subcon - the subcon which will operate on the buffer |
| * encoder - a function that takes a string and returns an encoded |
| string (used after building) |
| * decoder - a function that takes a string and returns a decoded |
| string (used before parsing) |
| * resizer - a function that takes the size of the subcon and "adjusts" |
| or "resizes" it according to the encoding/decoding process. |
| |
| Example: |
| Buffered(BitField("foo", 16), |
| encoder = decode_bin, |
| decoder = encode_bin, |
| resizer = lambda size: size / 8, |
| ) |
| """ |
| __slots__ = ["encoder", "decoder", "resizer"] |
| def __init__(self, subcon, decoder, encoder, resizer): |
| Subconstruct.__init__(self, subcon) |
| self.encoder = encoder |
| self.decoder = decoder |
| self.resizer = resizer |
| def _parse(self, stream, context): |
| data = _read_stream(stream, self._sizeof(context)) |
| stream2 = StringIO(self.decoder(data)) |
| return self.subcon._parse(stream2, context) |
| def _build(self, obj, stream, context): |
| size = self._sizeof(context) |
| stream2 = StringIO() |
| self.subcon._build(obj, stream2, context) |
| data = self.encoder(stream2.getvalue()) |
| assert len(data) == size |
| _write_stream(stream, self._sizeof(context), data) |
| def _sizeof(self, context): |
| return self.resizer(self.subcon._sizeof(context)) |
| |
| class Restream(Subconstruct): |
| """ |
| Wraps the stream with a read-wrapper (for parsing) or a |
| write-wrapper (for building). The stream wrapper can buffer the data |
| internally, reading it from- or writing it to the underlying stream |
| as needed. For example, BitStreamReader reads whole bytes from the |
| underlying stream, but returns them as individual bits. |
| See also Bitwise. |
| |
| When the parsing or building is done, the stream's close method |
| will be invoked. It can perform any finalization needed for the stream |
| wrapper, but it must not close the underlying stream. |
| |
| Note: |
| * Do not use pointers inside Restream |
| |
| Parameters: |
| * subcon - the subcon |
| * stream_reader - the read-wrapper |
| * stream_writer - the write wrapper |
| * resizer - a function that takes the size of the subcon and "adjusts" |
| or "resizes" it according to the encoding/decoding process. |
| |
| Example: |
| Restream(BitField("foo", 16), |
| stream_reader = BitStreamReader, |
| stream_writer = BitStreamWriter, |
| resizer = lambda size: size / 8, |
| ) |
| """ |
| __slots__ = ["stream_reader", "stream_writer", "resizer"] |
| def __init__(self, subcon, stream_reader, stream_writer, resizer): |
| Subconstruct.__init__(self, subcon) |
| self.stream_reader = stream_reader |
| self.stream_writer = stream_writer |
| self.resizer = resizer |
| def _parse(self, stream, context): |
| stream2 = self.stream_reader(stream) |
| obj = self.subcon._parse(stream2, context) |
| stream2.close() |
| return obj |
| def _build(self, obj, stream, context): |
| stream2 = self.stream_writer(stream) |
| self.subcon._build(obj, stream2, context) |
| stream2.close() |
| def _sizeof(self, context): |
| return self.resizer(self.subcon._sizeof(context)) |
| |
| |
| #=============================================================================== |
| # miscellaneous |
| #=============================================================================== |
| class Reconfig(Subconstruct): |
| """ |
| Reconfigures a subconstruct. Reconfig can be used to change the name and |
| set and clear flags of the inner subcon. |
| |
| Parameters: |
| * name - the new name |
| * subcon - the subcon to reconfigure |
| * setflags - the flags to set (default is 0) |
| * clearflags - the flags to clear (default is 0) |
| |
| Example: |
| Reconfig("foo", UBInt8("bar")) |
| """ |
| __slots__ = [] |
| def __init__(self, name, subcon, setflags = 0, clearflags = 0): |
| Construct.__init__(self, name, subcon.conflags) |
| self.subcon = subcon |
| self._set_flag(setflags) |
| self._clear_flag(clearflags) |
| |
| class Anchor(Construct): |
| """ |
| Returns the "anchor" (stream position) at the point where it's inserted. |
| Useful for adjusting relative offsets to absolute positions, or to measure |
| sizes of constructs. |
| absolute pointer = anchor + relative offset |
| size = anchor_after - anchor_before |
| See also Pointer. |
| |
| Notes: |
| * requires a seekable stream. |
| |
| Parameters: |
| * name - the name of the anchor |
| |
| Example: |
| Struct("foo", |
| Anchor("base"), |
| UBInt8("relative_offset"), |
| Pointer(lambda ctx: ctx.relative_offset + ctx.base, |
| UBInt8("data") |
| ) |
| ) |
| """ |
| __slots__ = [] |
| def _parse(self, stream, context): |
| return stream.tell() |
| def _build(self, obj, stream, context): |
| context[self.name] = stream.tell() |
| def _sizeof(self, context): |
| return 0 |
| |
| class Value(Construct): |
| """ |
| A computed value. |
| |
| Parameters: |
| * name - the name of the value |
| * func - a function that takes the context and return the computed value |
| |
| Example: |
| Struct("foo", |
| UBInt8("width"), |
| UBInt8("height"), |
| Value("total_pixels", lambda ctx: ctx.width * ctx.height), |
| ) |
| """ |
| __slots__ = ["func"] |
| def __init__(self, name, func): |
| Construct.__init__(self, name) |
| self.func = func |
| self._set_flag(self.FLAG_DYNAMIC) |
| def _parse(self, stream, context): |
| return self.func(context) |
| def _build(self, obj, stream, context): |
| context[self.name] = self.func(context) |
| def _sizeof(self, context): |
| return 0 |
| |
| #class Dynamic(Construct): |
| # """ |
| # Dynamically creates a construct and uses it for parsing and building. |
| # This allows you to create change the construction tree on the fly. |
| # Deprecated. |
| # |
| # Parameters: |
| # * name - the name of the construct |
| # * factoryfunc - a function that takes the context and returns a new |
| # construct object which will be used for parsing and building. |
| # |
| # Example: |
| # def factory(ctx): |
| # if ctx.bar == 8: |
| # return UBInt8("spam") |
| # if ctx.bar == 9: |
| # return String("spam", 9) |
| # |
| # Struct("foo", |
| # UBInt8("bar"), |
| # Dynamic("spam", factory), |
| # ) |
| # """ |
| # __slots__ = ["factoryfunc"] |
| # def __init__(self, name, factoryfunc): |
| # Construct.__init__(self, name, self.FLAG_COPY_CONTEXT) |
| # self.factoryfunc = factoryfunc |
| # self._set_flag(self.FLAG_DYNAMIC) |
| # def _parse(self, stream, context): |
| # return self.factoryfunc(context)._parse(stream, context) |
| # def _build(self, obj, stream, context): |
| # return self.factoryfunc(context)._build(obj, stream, context) |
| # def _sizeof(self, context): |
| # return self.factoryfunc(context)._sizeof(context) |
| |
| class LazyBound(Construct): |
| """ |
| Lazily bound construct, useful for constructs that need to make cyclic |
| references (linked-lists, expression trees, etc.). |
| |
| Parameters: |
| |
| |
| Example: |
| foo = Struct("foo", |
| UBInt8("bar"), |
| LazyBound("next", lambda: foo), |
| ) |
| """ |
| __slots__ = ["bindfunc", "bound"] |
| def __init__(self, name, bindfunc): |
| Construct.__init__(self, name) |
| self.bound = None |
| self.bindfunc = bindfunc |
| def _parse(self, stream, context): |
| if self.bound is None: |
| self.bound = self.bindfunc() |
| return self.bound._parse(stream, context) |
| def _build(self, obj, stream, context): |
| if self.bound is None: |
| self.bound = self.bindfunc() |
| self.bound._build(obj, stream, context) |
| def _sizeof(self, context): |
| if self.bound is None: |
| self.bound = self.bindfunc() |
| return self.bound._sizeof(context) |
| |
| class Pass(Construct): |
| """ |
| A do-nothing construct, useful as the default case for Switch, or |
| to indicate Enums. |
| See also Switch and Enum. |
| |
| Notes: |
| * this construct is a singleton. do not try to instatiate it, as it |
| will not work :) |
| |
| Example: |
| Pass |
| """ |
| __slots__ = [] |
| def _parse(self, stream, context): |
| pass |
| def _build(self, obj, stream, context): |
| assert obj is None |
| def _sizeof(self, context): |
| return 0 |
| Pass = Pass(None) |
| |
| class Terminator(Construct): |
| """ |
| Asserts the end of the stream has been reached at the point it's placed. |
| You can use this to ensure no more unparsed data follows. |
| |
| Notes: |
| * this construct is a singleton. do not try to instatiate it, as it |
| will not work :) |
| |
| Example: |
| Terminator |
| """ |
| __slots__ = [] |
| def _parse(self, stream, context): |
| if stream.read(1): |
| raise TerminatorError("expected end of stream") |
| def _build(self, obj, stream, context): |
| assert obj is None |
| def _sizeof(self, context): |
| return 0 |
| Terminator = Terminator(None) |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |