Source code for pysteg.jsteg.f4

## $Id: f4.py 1494 2009-04-30 13:10:41Z css1hs $
## -*- coding: utf-8 -*-

# ****************
# The F4 Algorithm
# ****************
# 
# 
# .. module:: pysteg.jsteg.f4
# 
# :Module:    pysteg.jsteg.f4
# :Date:      $Date: 2010-07-16 10:43:38 +0100 (Fri, 16 Jul 2010) $
# :Revision:  $Revision: 1559 $
# :Copyright: © 2009-10: University of Surrey, UK
# :Author:    Hans Georg Schaathun <georg@schaathun.net> (2009-2010)
# 
# ::
  
from random import SystemRandom
from .message import message
from stegbase import stegbase
from exceptions import *
from numpy import ceil

sysrnd = SystemRandom()

# Auxiliary functions
# ===================
# 
# ::

[docs]def getn(k): return (1<<k) - 1
[docs]def f5decode(S,n,k=0): """Decode a block using an [n,k] Hamming code as the matrix code. (k is actually ignored in the present version.)""" R = 0 for i in range(n): if ( S&(1<<i) > 0 ): R ^= (n-i) return R
[docs]def f5param(exp,msglen): """Calculate code parameters for the matrix code. Returns a pair (n,k).""" k = 7 for i in range(1,8): U = f4usable(exp,i) if ( U < msglen ): k = i - 1 break return ( (1<<k)-1 , k ) # .. function:: f4usable(exp,i) # # This is taken from Westfeld's java implementation of F5, # and assumes that every other one is ignored. # # ::
[docs]def f4usable(exp,i): "Estimate the number of usable coefficients for F4/F5." n = (1<<i)-1 ; e1 = exp*i/n return ( e1 - e1%n ) / 8 # The :class:`f4coef` Class # ========================= # # .. class:: f4coef # # ::
[docs]class f4coef(stegbase): """ The F4 object handles a list of JPEG AC coefficients and embeds and extracts messages using F4. """ # We view F4 as a special case of F5 and define code parameters # already in this class:: _code = (0,0)
[docs] def codedim(self): return self._code[1]
[docs] def codelength(self): return self._code[0] # .. method:: interpret(y) # # ::
[docs] def interpret(self,y): "Decode a coefficient x F4 style. Returns None when x=0." # We convert the input to integer for robustness. # Why on Earth do we occasionally get float input? # # :: x = int(y) # The demodulation itself is straight forward. # # :: if x == 0: return None elif x > 0: return (x&1) elif x < 0: return (1-(x&1)) # Capacity Analysis # ----------------- # # .. method:: embedAnalysis # # ::
[docs] def embedAnalysis(self): """Estimate the embedding capacity.""" A = map ( abs, self ) self._zero = _zero = self.count(0) self._one = _one = A.count(1) self._two = _two = A.count(2) self._large = _large = len(self) - _zero - _one - _two/4 self.expected = _exp = _large + _one/3 print "[f4coef] Length:", len(self) print "[f4coef] %i zeros; %i ones; %i twos" % (_zero,_one,_two) print "Expected capacity: %i bits" % (_exp) self.capacity = _exp # For F5, we analyse capacity for each possible code:: if self.codedim() > 0: self.f5capacity = {} for i in range(1,8): self.f5capacity[i] = self.f5capacityreport(i)
[docs] def f5capacityreport(self,i): """Print a report on F5 capacity.""" n = (1<<i)-1 ; U = f4usable(self.expected,i) C = float ( self._large - self._large%(n+1) ) C += self._one + self._one/3 - self._one/(n+1) C /= n+1 E = 1.0*U*8/C print "(%i,%i) code: %i bytes (efficiency %f bits per change)" \ % (n,i,U,E) return { "Length" : n, "Dim" : i, "Capacity" : U, "Efficiency" : E } # Why do we update the capacity here? # # :: self.capacity = _exp # Auxiliary Extraction Methods # ---------------------------- # # .. method:: f5extract(k=None): # # ::
[docs] def f5extract(self,k=None): "Extract and decode a single block using F5." # If k is not given, let's see if it is an attribute:: if k == None: k = self.codedim() assert k > 0 # If k=1, we effectively use F4:: if k == 1: b = self.getbit() if b != None: return b else: raise CapacityException, "Incomplete Message" # If k>1, we calculate n from k:: n = getn(k) # Use F4 to extract code bits:: B = self.getword(n,pad=None) if B == None: raise CapacityException # Finally, we decode using the matrix code:: return f5decode(B,n) # .. method:: f5msgextract # # ::
[docs] def f5msgextract( self ): # Where F4 just uses a list of bytes, we need a message objecct # which can accept an arbitrary number of bits at a time:: R = message( ) # The status block stores the message length (L) in bytes, # and the code dimension (k):: S = self.getword(32) print "[f5msgextract] Status block:", S L = S&0x007FFFFF print "[f5msgextract] Extracting length:", L k = S >> 24 print "[f5msgextract] Code dimension: k =", k if k == 0: print "[f5msgextract] Warning. Decoding F4." k = 1 # Calculate the message length (N) in blocks, and loop for each # block:: N = int( ceil(L*8.0/k) ) print "[f5msgextract] Number of blocks: N =", N for i in range(N): # In the event that capacity has been exceeded during embedding, # we want to catch :class:`CapacityException`. # # :: try: R.putbits ( self.f5extract(k), k ) except CapacityException, e: print "Warning: Incomplete Message" print e print "Extracted %i bytes of %i expected." % (R.length(),L) print "Last byte may be incomplete." # The above differs from F4 in two ways. Firstly, it uses a message # object rather than a list. Secondly, there is an extra warning about # incomplete bytes, as the message may have been truncated in the middle # of a byte. # # Since blocks are padded with zeros, we sometimes get an # extra 0 byte in R. The drop() method drops this byte, # checking that it is indeed 0:: while R.length() > L: R.drop() return R # .. method:: getbit(pad=None) # # This is overridden to provide extra error checking. # It is otherwise functionally equivalent to the getbit() method of # the parent class. # # ::
[docs] def getbit( self, pad=None ): # Extract single bit "Extract a single bit using F4." if self.next >= len(self): self.morebits = False return pad assert self[self.next] != 0, "Undetected Capacity Violation" R = self.interpret( self[self.next] ) self._step() if self.next < len(self): assert self[self.next] != 0, "Undetected Capacity Violation" assert R != None return R # Update coefficients. # This should be the only method modifying coefficients. # # ::
[docs] def flipnext(self): """Flip the message bit in the next position. (Does not proceed to the subsequent bit.)""" if self[self.next] == 0: print "[flipnext] UnusedCoefficientError. self.next =", self.next raise UnusedCoefficientError, "Attempting to flip a zero" if self[self.next] < 0: self[self.next] += 1 elif self[self.next] > 0: self[self.next] -= 1 if self[self.next] == 0: self.thrown += 1 self.checknext() self.changed += 1
[docs] def flipbit(self,no=0): """Flip bit no in the current block ignoring all zeros. The next coefficient counter is rewound to the start of the block.""" self._blockRewind() # Return to traverse the block from start for i in range(no): self._step() if self[self.next] == 0: print "[flipbit] %i/%i (%i)" % (self.next, len(self), self[self.next]) raise CapacityException, "Undetected Capacity Violation" assert self[self.next] != 0, "Undetected Capacity Violation" self.flipnext() # Flip this bit self._blockRewind() # Return again to the start of the block # Override checknext() to ensure that the right coefficients are used. # # ::
[docs] def checknext( self ): """Check that the next coefficient can be used. Move pointer if necessary.""" if not self.morebits: return False while self[self.next] == 0: if not self._auxstep(): return False return self.morebits # Auxiliary Embedding Methods # --------------------------- # # .. method:: f4msgembed_aux(M) # # ::
[docs] def f4msgembed_aux( self, M ): # Embed message (no status block) """Embed a message using F4 without any status word or length indicator. (Auxiliary function.)""" b = M.getbit() while b != None: if not self.morebits: raise CapacityException, "Capacity exceeded" self.embedbit(b) b = M.getbit() return self.embeddingReport() # .. method:: f5embedblock(msg,k=None) # # embeds a single code block using F5. # # ::
[docs] def f5embedblock(self,msg,k=None): """ Embed a single F5 block (representing k message bits) at the current position. """ # Firstly, let's check that we have at least one coefficient # available for embedding. # # :: if not self.morebits: s = "Capacity violated at the start of a new F5 block" print "[f4embedblock]", s raise CapacityException, s # The principle of the embedding is to make a test extraction. # If the result B is 0, no change is necessary. If it is non-zero, # it indicates the position which needs to be flipped. # # :: B = self.f5extract(k) ^ msg while B != 0: self.flipbit(B-1) B = self.f5extract(k) ^ msg # Once the embedding is successful, we can proceed to the next block. # The :func:`_blockProceed` method updates all the pointers. # Finally, we update the count of bits embedded. # # :: self._blockProceed() self.embedded += k return # .. method:: f5msgembed( M, k=None ) # # embeds a complete message using F5. Clearly, if k=1, this equates # to F4. # # ::
[docs] def f5msgembed( self, M, k=None ): """ Embed a complete message including status block, using F5 block. If k=None, an optimal code is chosen based on estimated capacity. """ L = M.length() self.attempted = L*8+32 # If k is None, we try to calculate the optimal code based on the # message length and estimated capacity:: if k == None: (n,k) = f5param(self.expected,L+4) if k == 0: print "Warning: Expected capacity is insufficient." print "Using a [1,1] code and trying anyway." k = 1 # If k is given and k>0, we use that as the code dimension. # If k=0, we embed as F4, but k=0 will be stored in the status word:: elif k == 0: n = 0 else: n = getn(k) self._code = (n,k) # The Status word stores the message length and the code dimension:: S = L | (k<<24) # Make status word print "[f5msgembed] Status block:", S self.embed(S,32) # Embed status word self._blockProceed() # If k=1 or k=1, we effectively use F4:: if k <= 1: return self.f4msgembed_aux(M) # Embed message # If k>1, we use :func:`f5embedblock` and loop until the entire # message has been embedded:: print "Code parameters:", (n,k) b = M.getbits(k) while b != None: self.f5embedblock(b,k) b = M.getbits(k) return self.embeddingReport() # Diagnostics # ----------- # # ::
[docs] def embeddingReport(self): # Diagnostics output """Print and return quantitative measures of the embedding performance.""" print "Used %i of %i coefficients." \ % (self.next,self.length()) print "Embedded %i bits." % (self.embedded) print "Flipped %i bits." % (self.changed) print "Discarded (zeroed) %i coefficients." % (self.thrown) nz = self.length() - self._zero return { "Length" : self.length(), "Used" : self.next, "Embedded" : self.embedded, "Attempted" : self.attempted, "Flipped" : self.changed, "Discarded" : self.thrown, "Zero" : self._zero, "One" : self._one, "Two" : self._two, "Non-zero" : nz, "BPC" : float( self.embedded ) / self.length(), "Code" : self._code } # Main Interface Methods # ---------------------- # # Generic function identifiers for polymorphism # # ::
[docs] def msgExtract( self, *a, **kw ): """Extract a message from the coefficients.""" return self.f5msgextract( *a, **kw )
[docs] def msgEmbed( self, *a, **kw ): """Embed a message in the coefficients.""" return self.f5msgembed( *a, **kw )