1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 | Lib/xdrlib.py
"""Implements (a subset of) Sun XDR -- eXternal Data Representation. See: RFC 1014 """ import struct try: from cStringIO import StringIO as _StringIO except ImportError: from StringIO import StringIO as _StringIO from functools import wraps __all__ = ["Error", "Packer", "Unpacker", "ConversionError"] # exceptions class Error(Exception): """Exception class for this module. Use: except xdrlib.Error, var: # var has the Error instance for the exception Public ivars: msg -- contains the message """ def __init__(self, msg): self.msg = msg def __repr__(self): return repr(self.msg) def __str__(self): return str(self.msg) class ConversionError(Error): pass def raise_conversion_error(function): """ Wrap any raised struct.errors in a ConversionError. """ @wraps(function) def result(self, value): try: return function(self, value) except struct.error as e: raise ConversionError(e.args[0]) return result class Packer: """Pack various data representations into a buffer.""" def __init__(self): self.reset() def reset(self): self.__buf = _StringIO() def get_buffer(self): return self.__buf.getvalue() # backwards compatibility get_buf = get_buffer @raise_conversion_error def pack_uint(self, x): self.__buf.write(struct.pack('>L', x)) @raise_conversion_error def pack_int(self, x): self.__buf.write(struct.pack('>l', x)) pack_enum = pack_int def pack_bool(self, x): if x: self.__buf.write('\0\0\0\1') else: self.__buf.write('\0\0\0\0') def pack_uhyper(self, x): try: self.pack_uint(x>>32 & 0xffffffffL) except (TypeError, struct.error) as e: raise ConversionError(e.args[0]) try: self.pack_uint(x & 0xffffffffL) except (TypeError, struct.error) as e: raise ConversionError(e.args[0]) pack_hyper = pack_uhyper @raise_conversion_error def pack_float(self, x): self.__buf.write(struct.pack('>f', x)) @raise_conversion_error def pack_double(self, x): self.__buf.write(struct.pack('>d', x)) def pack_fstring(self, n, s): if n < 0: raise ValueError, 'fstring size must be nonnegative' data = s[:n] n = ((n+3)//4)*4 data = data + (n - len(data)) * '\0' self.__buf.write(data) pack_fopaque = pack_fstring def pack_string(self, s): n = len(s) self.pack_uint(n) self.pack_fstring(n, s) pack_opaque = pack_string pack_bytes = pack_string def pack_list(self, list, pack_item): for item in list: self.pack_uint(1) pack_item(item) self.pack_uint(0) def pack_farray(self, n, list, pack_item): if len(list) != n: raise ValueError, 'wrong array size' for item in list: pack_item(item) def pack_array(self, list, pack_item): n = len(list) self.pack_uint(n) self.pack_farray(n, list, pack_item) class Unpacker: """Unpacks various data representations from the given buffer.""" def __init__(self, data): self.reset(data) def reset(self, data): self.__buf = data self.__pos = 0 def get_position(self): return self.__pos def set_position(self, position): self.__pos = position def get_buffer(self): return self.__buf def done(self): if self.__pos < len(self.__buf): raise Error('unextracted data remains') def unpack_uint(self): i = self.__pos self.__pos = j = i+4 data = self.__buf[i:j] if len(data) < 4: raise EOFError x = struct.unpack('>L', data)[0] try: return int(x) except OverflowError: return x def unpack_int(self): i = self.__pos self.__pos = j = i+4 data = self.__buf[i:j] if len(data) < 4: raise EOFError return struct.unpack('>l', data)[0] unpack_enum = unpack_int def unpack_bool(self): return bool(self.unpack_int()) def unpack_uhyper(self): hi = self.unpack_uint() lo = self.unpack_uint() return long(hi)<<32 | lo def unpack_hyper(self): x = self.unpack_uhyper() if x >= 0x8000000000000000L: x = x - 0x10000000000000000L return x def unpack_float(self): i = self.__pos self.__pos = j = i+4 data = self.__buf[i:j] if len(data) < 4: raise EOFError return struct.unpack('>f', data)[0] def unpack_double(self): i = self.__pos self.__pos = j = i+8 data = self.__buf[i:j] if len(data) < 8: raise EOFError return struct.unpack('>d', data)[0] def unpack_fstring(self, n): if n < 0: raise ValueError, 'fstring size must be nonnegative' i = self.__pos j = i + (n+3)//4*4 if j > len(self.__buf): raise EOFError self.__pos = j return self.__buf[i:i+n] unpack_fopaque = unpack_fstring def unpack_string(self): n = self.unpack_uint() return self.unpack_fstring(n) unpack_opaque = unpack_string unpack_bytes = unpack_string def unpack_list(self, unpack_item): list = [] while 1: x = self.unpack_uint() if x == 0: break if x != 1: raise ConversionError, '0 or 1 expected, got %r' % (x,) item = unpack_item() list.append(item) return list def unpack_farray(self, n, unpack_item): list = [] for i in range(n): list.append(unpack_item()) return list def unpack_array(self, unpack_item): n = self.unpack_uint() return self.unpack_farray(n, unpack_item) |