Source code for pysteg.jsteg.message
## $Id: f4.py 1494 2009-04-30 13:10:41Z css1hs $
## -*- coding: utf-8 -*-
# *****************
# The Message Class
# *****************
#
# .. module:: pysteg.jsteg.message
#
# :Module: pysteg.jsteg.message
# :Date: $Date: 2010-07-16 10:43:38 +0100 (Fri, 16 Jul 2010) $
# :Revision: $Revision: 1559 $
# :Copyright: © 2009-10: University of Surrey, UK
# :Author: Hans Georg Schaathun <georg@schaathun.net> (2009-2010)
#
# ::
from random import SystemRandom
sysrnd = SystemRandom()
[docs]class message(list):
"""A message class for communications systems.
It allows getting a bit at a time from a sequence of bytes.
Not perfect; it should be extended to work on sequences of
other objects, such as int32 or Unicode characters."""
_available = 0
_current = 0
_next = 0
_morebits = True
_receive = False # Disable putbits()
def __init__(self,random=None,verbosity=1):
"""If random is not None, it should give the message length in bytes,
and a random message is generated."""
list.__init__([])
self.verbosity = verbosity
if random != None:
self.extend([ sysrnd.getrandbits(8) for x in range(random) ])
if verbosity > 2:
print "[message.__init__] New random message (%i bytes)" % (len(self),)
else:
self._receive = True
self._next = -1
if verbosity > 2:
print "[message.__init__] Creating new, empty message."
[docs] def reset(self):
self._next = 0
self._available = 0
self._receive = False
self._morebits = True
self._current = 0
[docs] def drop(self):
"""Drop the last byte if zero. It is an error to call
this method if the last byte is non-zero. It is used
by the extractor when the last byte only contains
padding zeros used to fill up the last block."""
N = self.length() - 1
assert self[N] == 0
del( self[N] )
[docs] def putbits(self,B,N=1):
"""Insert new bits in the message. Used at the receiver end."""
assert self._receive
if self._available == 0: # Start on next byte
self._next += 1
self.append(0)
self._available = 8
m = self._available
if N > m:
nn = N - m
# Mask out m bits for inclusion in the current byte
b_ = B >> nn
b_ &= (1<<m)-1
# Insert bits into current byte
self[self._next] |= b_
self._available = 0
# Mask out nn bits for inclusion in the subsequent bytes
bb = B
bb &= (1<<nn)-1
# Recursive call to fill subsequent bytes
return self.putbits(bb,nn)
else:
b_ = B
b_ <<= m - N
self[self._next] |= b_
self._available -= N
return
[docs] def length(self): return len(self)
[docs] def getbit(self):
"""Return the next bit of the messaage.
None is returned if there are no remaining bits."""
if self._available == 0:
if self._next >= self.length():
self._morebits = False
return None
self._current = self[self._next]
self._next += 1
self._available = 8
# R = self.current&1
# self.current >>= 1
R = self._current >> (self._available - 1)
R &= 1
self._available -= 1
return R
[docs] def getbits(self,count=1):
"""Return the next /count/ bits of the messaage.
None is returned if there are no remaining bits."""
R = 0
if not self._morebits: return None
for i in range(count):
B = self.getbit()
if B == None: break
R |= B<<(count-1-i)
return R
[docs] def isequal(self,other):
"""Compare the message to another message object or arbitrary list."""
L = len(self)
R = 0
if len(other) != L: return False
for i in range(L):
if self[i] != other[i]: return False
R += 1
### if len(other) != len(self): return False
return True