Source code for bravo.infini.packets

import functools

from construct import Struct, Container, Embed, MetaField
from construct import Switch, Const, Peek
from construct import OptionalGreedyRange
from construct import PascalString
from construct import UBInt8, UBInt16, UBInt32

DUMP_ALL_PACKETS = False

AlphaString = functools.partial(PascalString,
    length_field=UBInt16("length"),
    encoding="utf8")

[docs]def String(name): """ UTF-8 length-prefixed string. """ return PascalString(name, length_field=UBInt16("length"), encoding="utf-8")
[docs]def InfiniPacket(name, identifier, subconstruct): """ Common header structure for packets. This is possibly not the best way to go about building these kinds of things. """ header = Struct("header", # XXX Should this be Magic(chr(identifier))? Const(UBInt8("identifier"), identifier), UBInt8("flags"), UBInt32("length"), ) return Struct(name, header, subconstruct)
packets = { 0: InfiniPacket("ping", 0x00, Struct("payload", UBInt16("uid"), UBInt32("timestamp"), ) ), 255: InfiniPacket("disconnect", 0xff, Struct("payload", AlphaString("explanation"), ) ), "__default__": Struct("unknown", Struct("header", UBInt8("identifier"), UBInt8("flags"), UBInt32("length"), ), MetaField("data", lambda context: context["length"]), ), } packets_by_name = { "ping" : 0, "disconnect" : 255, } infinipacket_parser = Struct("parser", OptionalGreedyRange( Struct("packets", Peek(UBInt8("header")), Embed(Switch("packet", lambda context: context["header"], packets)), ), ), OptionalGreedyRange( UBInt8("leftovers"), ), ) def parse_packets(bytestream): container = infinipacket_parser.parse(bytestream) l = [(i.header, i.payload) for i in container.packets] leftovers = "".join(chr(i) for i in container.leftovers) if DUMP_ALL_PACKETS: for packet in l: print "Parsed packet %d" % packet[0] print packet[1] return l, leftovers
[docs]def make_packet(packet, *args, **kwargs): """ Constructs a packet bytestream from a packet header and payload. The payload should be passed as keyword arguments. Additional containers or dictionaries to be added to the payload may be passed positionally, as well. """ if packet not in packets_by_name: print "Couldn't find packet name %s!" % packet return "" header = packets_by_name[packet] for arg in args: kwargs.update(dict(arg)) payload = Container(**kwargs) if DUMP_ALL_PACKETS: print "Making packet %s (%d)" % (packet, header) print payload payload = packets[header].build(payload) return chr(header) + payload