Source code for pysteg.jsteg.message

## $Id: f4.py 1494 2009-04-30 13:10:41Z css1hs $
## -*- coding: utf-8 -*-

# *****************
# The Message Class
# *****************
# 
# .. module:: pysteg.jsteg.message
# 
# :Module:    pysteg.jsteg.message
# :Date:      $Date: 2010-07-16 10:43:38 +0100 (Fri, 16 Jul 2010) $
# :Revision:  $Revision: 1559 $
# :Copyright: © 2009-10: University of Surrey, UK
# :Author:    Hans Georg Schaathun <georg@schaathun.net> (2009-2010)
# 
# ::
  
from random import SystemRandom

sysrnd = SystemRandom()

[docs]class message(list): """A message class for communications systems. It allows getting a bit at a time from a sequence of bytes. Not perfect; it should be extended to work on sequences of other objects, such as int32 or Unicode characters.""" _available = 0 _current = 0 _next = 0 _morebits = True _receive = False # Disable putbits() def __init__(self,random=None,verbosity=1): """If random is not None, it should give the message length in bytes, and a random message is generated.""" list.__init__([]) self.verbosity = verbosity if random != None: self.extend([ sysrnd.getrandbits(8) for x in range(random) ]) if verbosity > 2: print "[message.__init__] New random message (%i bytes)" % (len(self),) else: self._receive = True self._next = -1 if verbosity > 2: print "[message.__init__] Creating new, empty message."
[docs] def reset(self): self._next = 0 self._available = 0 self._receive = False self._morebits = True self._current = 0
[docs] def drop(self): """Drop the last byte if zero. It is an error to call this method if the last byte is non-zero. It is used by the extractor when the last byte only contains padding zeros used to fill up the last block.""" N = self.length() - 1 assert self[N] == 0 del( self[N] )
[docs] def putbits(self,B,N=1): """Insert new bits in the message. Used at the receiver end.""" assert self._receive if self._available == 0: # Start on next byte self._next += 1 self.append(0) self._available = 8 m = self._available if N > m: nn = N - m # Mask out m bits for inclusion in the current byte b_ = B >> nn b_ &= (1<<m)-1 # Insert bits into current byte self[self._next] |= b_ self._available = 0 # Mask out nn bits for inclusion in the subsequent bytes bb = B bb &= (1<<nn)-1 # Recursive call to fill subsequent bytes return self.putbits(bb,nn) else: b_ = B b_ <<= m - N self[self._next] |= b_ self._available -= N return
[docs] def length(self): return len(self)
[docs] def getbit(self): """Return the next bit of the messaage. None is returned if there are no remaining bits.""" if self._available == 0: if self._next >= self.length(): self._morebits = False return None self._current = self[self._next] self._next += 1 self._available = 8 # R = self.current&1 # self.current >>= 1 R = self._current >> (self._available - 1) R &= 1 self._available -= 1 return R
[docs] def getbits(self,count=1): """Return the next /count/ bits of the messaage. None is returned if there are no remaining bits.""" R = 0 if not self._morebits: return None for i in range(count): B = self.getbit() if B == None: break R |= B<<(count-1-i) return R
[docs] def isequal(self,other): """Compare the message to another message object or arbitrary list.""" L = len(self) R = 0 if len(other) != L: return False for i in range(L): if self[i] != other[i]: return False R += 1 ### if len(other) != len(self): return False return True