Source code for pysteg.jsteg.stegbase

## $Id$
## -*- coding: utf-8 -*-

# *******************
# The interface class
# *******************
# 
# :Module:    pysteg.jsteg.stegbase
# :Date:      $Date$
# :Revision:  $Revision$
# :Copyright: © 2010: University of Surrey, UK
# :Author:    Hans Georg Schaathun <georg@schaathun.net> (2010)
# 
# The stegbase class is a generic interface for Stego Objects, i.e.
# multimedia signals with the necessary methods to embed and extract
# steganographic messages.
# 
# Necessary exceptions are defined in the exceptions module.
# We will also need a Random Number Generator.  SystemRandom
# provides one based on physical events, i.e. potential true
# randomness.
# 
# ::

from exceptions import *

from random import SystemRandom
sysrnd = SystemRandom()

# The stegbase Abstract Class
# ===========================
# 
# To capture the nature of a 1-D signal, we use :class:`list` as the
# base class.  
# 
# ::

[docs]class stegbase(list): """ This is an abstract class, defining key components of the interface of the other JPEG steganography classes. """ capacity = None # :: def __init__(self,L,key=None,rndkey=False,verbosity=1): """ Create a new object containing the signal L, which can be any list. The list L will usually be obtained from jpeg.getsignal(), and reinserted with jpeg.setsignal(L) after embedding. To embed in random locations, the signal L should be shuffled with a random permutation before being passed to the stego object. The jpeg class provided the functionality to do this. The key is currently not used at all, and is intended for encryption in future versions. If key is None and rndkey is True, a random key is generated (and this can be retrieved by the getkey() method. """ list.__init__(self,map(int,L)) # In practical use, a Stego-System should have two secret keys. # One should be used to permute the signal to get embedding in # pseudo-random locations. The other should be used to encrypt # the message before the embedding. Objects derived from stegbase # assume that the signal is permuted before the object is initialised. # It does, however, accept a key to be specified. It is not currently # used, but could be used for encryption. # # This key can be specified in three ways. # If key is not None, this is used as the secret key. # # :: if key != None: self.key = key elif rndkey: self.key = [ sysrnd.getrandbits(16) for x in range(16) ] else: self.key = None self._reset() self.verbosity = verbosity if self.verbosity > 1: print "New stego-algorithm object:", type(self) # :: def _reset(self): """Reinitialise the sequence for reembedding or extraction.""" self.next = 0 # Pointer to next coefficient self.morebits = True # There are yet some coefficients to use self.changed = 0 # Number of modified coefficients self.thrown = 0 # Number of newly created zeroes self.embedded = 0 # Number of embedded (channel) bits self.checknext() # Skip zeros self.blockstart = self.next # Current working block (only used in F5) self.attempted = 0 # No embedding attempted yet self.embedAnalysis()
[docs] def length(self): """ Return the length of the signal, i.e. the total number of coefficients. """ return len(self)
[docs] def getkey(self): """Return the key used to encrypt the message.""" return self.key
[docs] def getcapacity(self): """Return an estimate of the capacity.""" return self.capacity # The low-level methods # --------------------- # # The overall embedding and extractions methods are algorithm independent. # The following methods define how message bits are modulated and demodulated # in individual signal samples, including what samples may be used. # Overriding these methods is sufficient to create JSteg, Outguess 0.1, # F3, or F4. # # ::
[docs] def interpret(self,y): """ Demodulate the given signal sample y. This must be overridden by any child class. """ raise NotImplementedError, "Modulation has not been defined."
[docs] def flipnext(self): """ Flip the message bit in the next position. (Does not proceed to the subsequent bit.) This must be overridden by any child class. """ raise NotImplementedError, "Modulation has not been defined."
[docs] def checknext( self ): """ Check that the next coefficient can be used, and move the pointer if necessary. This must be overridden by any child class. """ raise NotImplementedError, \ "Set of appropriate coefficients has not been defined." # Auxiliary Embedding and Extraction # ---------------------------------- # # Embedding # ^^^^^^^^^ # .. method:: embedbit( msg ) # # embeds a single bit::
[docs] def embedbit( self, msg ): # Embed single bit """ Embed a single bit in the next applicable position. This rarely requires overriding. It uses the interpret() and flipnext() methods, which should be overridden to define the modulation. """ # The calling method is expected to check that there are samples # left for embedding. If we have already reached the end of the # signal, it is an error:: if not self.morebits: raise CapacityError, "Undetected capacity overflow" # Exceeding capacity, the index will be out of bounds. We just # catch the exception and return False to indicate that the bit # could not be embedded:: try: while self.interpret(self[self.next]) != msg: if not self.checknext(): return False else: self.flipnext() except IndexError, e: return False # When the bit is successfully embedded, we return update the counter, # increment the pointer, and return True:: self._step() self.embedded += 1 return True # .. method:: embed( msg, N=32 ) # # embeds a block of N bits::
[docs] def embed( self, msg, N=32 ): """ Embed N bits. It uses the embedbit() method and rarely requires overriding. """ for i in range(N): b = msg >> (N-1-i) b &= 1 if not self.morebits: raise CapacityException, "Capacity exceeded" if not self.embedbit(b): raise CapacityException, "Capacity exceeded" return True # Extraction # ^^^^^^^^^^ # # ::
[docs] def getbit( self, pad=None ): # Extract single bit """Extract a single bit. This uses the interpret() method, and will rarely need overriding. Instead the interpret() method is overridden. """ if self.next >= len(self): self.morebits = False return pad R = self.interpret( self[self.next] ) self._step() return R
[docs] def getword( self, N=1, pad=0 ): # Extract block (N bits) """ Extract N bits. This uses the getbit() method, and rarely needs overriding. """ R = 0 if not self.morebits: raise CapacityException, "End of sequence reached" for i in range(N): b = self.getbit(pad=pad) if b == None: return None R |= b <<(N-1-i) return R # Stepping Methods # ---------------- # # The stepping methods are used to traverse the sequence. # No other functions should be allowed to update the next pointer # # ::
def _auxstep(self): """ Proceed to the next coefficient. This is an auxiliary method which rarely needs overriding. """ self.next += 1 if self.next >= len(self): self.morebits = False return self.morebits def _step(self): """ Proceed to the next coefficient. This uses the checknext() method which must be overridden for different stego systems. This method rarely requires overriding. """ if not self._auxstep(): return False return self.checknext() # Extra Auxiliary Methods for F5 # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ # # The matrix coding of F5 means that we sometimes need to redo # the embedding for a full block. Therefore we need additional # stepping methods to rewind and start a block over. # # :: def _blockRewind(self): """Return to the start of the current block.""" self.next = self.blockstart if not self.morebits: if self.verbosity > 0: print "[blockRewind] Resetting morebits flag." self.morebits = True self.checknext() self.blockstart = self.next def _blockProceed(self): """Proceed to the next block.""" self.blockstart = self.next # Diagnostic Methods # ------------------ # # ::
[docs] def embedAnalysis(self): "Estimate the embedding capacity. (To be overridden by all children.)" raise NotImplementedError, \ "Set of appropriate coefficients has not been defined."
[docs] def embeddingReport(self): # Diagnostics output """Print and return quantitative measures of the embedding performance.""" print "Used %i of %i coefficients." \ % (self.next,self.length()) print "Embedded %i bits." % (self.embedded) print "Flipped %i bits." % (self.changed) return { "Length" : self.length(), "Used" : self.next, "Embedded" : self.embedded, "Attempted" : self.attempted, "Flipped" : self.changed, } # Front-End Methods for Embedding and Extraction # ---------------------------------------------- # # ::
[docs] def msgEmbed( self, M ): # Embed entire message """Embed a message, including a status word.""" L = M.length() self.embed(L,32) self.attempted = L*8+32 return self.msgembed_aux( M )
[docs] def msgembed_aux( self, M ): # Embed message (no status block) """Embed a message using F4 without any status word or length indicator. (Auxiliary function.)""" b = M.getbit() while b != None: if not self.morebits: raise CapacityException, "Capacity exceeded" self.embedbit(b) b = M.getbit() return self.embeddingReport()
[docs] def msgExtract( self ): # Extract entire message R = [] S = self.getword(32) if self.verbosity > 1: print "[msgExtract] Status block:", S L = S&0x007FFFFF if self.verbosity > 1: print "[msgExtract] Extracting length:", L for i in range(L): # We prepare for capacity being exceeded by catching the # :class:`CapacityException`:: try: R.append ( self.getword(8) ) except CapacityException, e: print "Warning: Incomplete message" print "Extracted %i bytes of %i expected." % (len(R),L) if __debug__ == 1: print e break return R