# Source code for pysteg.jsteg.f4

```## \$Id: f4.py 1494 2009-04-30 13:10:41Z css1hs \$
## -*- coding: utf-8 -*-

# ****************
# The F4 Algorithm
# ****************
#
#
# .. module:: pysteg.jsteg.f4
#
# :Module:    pysteg.jsteg.f4
# :Date:      \$Date: 2010-07-16 10:43:38 +0100 (Fri, 16 Jul 2010) \$
# :Revision:  \$Revision: 1559 \$
# :Author:    Hans Georg Schaathun <georg@schaathun.net> (2009-2010)
#
# ::

from random import SystemRandom
from .message import message
from stegbase import stegbase
from exceptions import *
from numpy import ceil

sysrnd = SystemRandom()

# Auxiliary functions
# ===================
#
# ::

[docs]def getn(k): return (1<<k) - 1

[docs]def f5decode(S,n,k=0):
"""Decode a block using an [n,k] Hamming code as the matrix code.
(k is actually ignored in the present version.)"""
R = 0
for i in range(n):
if ( S&(1<<i) > 0 ): R ^= (n-i)
return R

[docs]def f5param(exp,msglen):
"""Calculate code parameters for the matrix code.  Returns a pair (n,k)."""
k = 7
for i in range(1,8):
U = f4usable(exp,i)
if ( U < msglen ):
k = i - 1
break
return ( (1<<k)-1 , k )

# .. function:: f4usable(exp,i)
#
# This is taken from Westfeld's java implementation of F5,
# and assumes that every other one is ignored.
#
# ::

[docs]def f4usable(exp,i):
"Estimate the number of usable coefficients for F4/F5."
n = (1<<i)-1 ;
e1 = exp*i/n
return ( e1 - e1%n ) / 8

# The :class:`f4coef` Class
# =========================
#
# .. class:: f4coef
#
# ::

[docs]class f4coef(stegbase):
"""
The F4 object handles a list of JPEG AC coefficients and
embeds and extracts messages using F4.
"""

# We view F4 as a special case of F5 and define code parameters

_code = (0,0)
[docs]  def codedim(self): return self._code[1]
[docs]  def codelength(self): return self._code[0]

# .. method:: interpret(y)
#
# ::

[docs]  def interpret(self,y):
"Decode a coefficient x F4 style.  Returns None when x=0."

# We convert the input to integer for robustness.
# Why on Earth do we occasionally get float input?
#
# ::

x = int(y)

# The demodulation itself is straight forward.
#
# ::

if x == 0: return None
elif x > 0: return (x&1)
elif x < 0: return (1-(x&1))

# Capacity Analysis
# -----------------
#
# .. method:: embedAnalysis
#
# ::

[docs]  def embedAnalysis(self):
"""Estimate the embedding capacity."""
A = map ( abs, self )
self._zero  = _zero = self.count(0)
self._one   = _one = A.count(1)
self._two   = _two = A.count(2)
self._large = _large = len(self) - _zero - _one - _two/4
self.expected = _exp = _large + _one/3
print "[f4coef] Length:", len(self)
print "[f4coef]  %i zeros; %i ones; %i twos" % (_zero,_one,_two)
print "Expected capacity: %i bits" % (_exp)
self.capacity = _exp

# For F5, we analyse capacity for each possible code::

if self.codedim() > 0:
self.f5capacity = {}
for i in range(1,8): self.f5capacity[i] = self.f5capacityreport(i)

[docs]  def f5capacityreport(self,i):
"""Print a report on F5 capacity."""
n = (1<<i)-1 ;
U = f4usable(self.expected,i)
C = float ( self._large - self._large%(n+1) )
C += self._one + self._one/3 - self._one/(n+1)
C /= n+1
E = 1.0*U*8/C
print "(%i,%i) code: %i bytes (efficiency %f bits per change)" \
% (n,i,U,E)
return { "Length" : n, "Dim" : i, "Capacity" : U, "Efficiency" : E }

# Why do we update the capacity here?
#
# ::

self.capacity = _exp

# Auxiliary Extraction Methods
# ----------------------------
#
# .. method:: f5extract(k=None):
#
# ::

[docs]  def f5extract(self,k=None):
"Extract and decode a single block using F5."

# If k is not given, let's see if it is an attribute::

if k == None: k = self.codedim()
assert k > 0

# If k=1, we effectively use F4::

if k == 1:
b = self.getbit()
if b != None: return b
else: raise CapacityException, "Incomplete Message"

# If k>1, we calculate n from k::

n = getn(k)

# Use F4 to extract code bits::

if B == None:
raise CapacityException

# Finally, we decode using the matrix code::

return f5decode(B,n)

# .. method:: f5msgextract
#
# ::

[docs]  def f5msgextract( self ):

# Where F4 just uses a list of bytes, we need a message objecct
# which can accept an arbitrary number of bits at a time::

R = message( )

# The status block stores the message length (L) in bytes,
# and the code dimension (k)::

S = self.getword(32)
print "[f5msgextract] Status block:", S
L = S&0x007FFFFF
print "[f5msgextract] Extracting length:", L
k = S >> 24
print "[f5msgextract] Code dimension: k =", k
if k == 0:
print "[f5msgextract] Warning.  Decoding F4."
k = 1

# Calculate the message length (N) in blocks, and loop for each
# block::

N = int( ceil(L*8.0/k) )
print "[f5msgextract] Number of blocks: N =", N
for i in range(N):

# In the event that capacity has been exceeded during embedding,
# we want to catch :class:`CapacityException`.
#
# ::

try:
R.putbits ( self.f5extract(k), k )
except CapacityException, e:
print "Warning: Incomplete Message"
print e
print "Extracted %i bytes of %i expected." % (R.length(),L)
print "Last byte may be incomplete."

# The above differs from F4 in two ways.  Firstly, it uses a message
# object rather than a list.  Secondly, there is an extra warning about
# incomplete bytes, as the message may have been truncated in the middle
# of a byte.
#
# Since blocks are padded with zeros, we sometimes get an
# extra 0 byte in R.  The drop() method drops this byte,
# checking that it is indeed 0::

while R.length() > L: R.drop()
return R

#
#    This is overridden to provide extra error checking.
#    It is otherwise functionally equivalent to the getbit() method of
#    the parent class.
#
# ::

[docs]  def getbit( self, pad=None ): # Extract single bit
"Extract a single bit using F4."
if self.next >= len(self):
self.morebits = False
assert self[self.next] != 0, "Undetected Capacity Violation"
R = self.interpret( self[self.next] )
self._step()
if self.next < len(self):
assert self[self.next] != 0, "Undetected Capacity Violation"
assert R != None
return R

# Update coefficients.
# This should be the only method modifying coefficients.
#
# ::

[docs]  def flipnext(self):
"""Flip the message bit in the next position.
(Does not proceed to the subsequent bit.)"""
if self[self.next] == 0:
print "[flipnext] UnusedCoefficientError.  self.next =", self.next
raise UnusedCoefficientError, "Attempting to flip a zero"
if self[self.next] < 0: self[self.next] += 1
elif self[self.next] > 0: self[self.next] -= 1
if self[self.next] == 0: self.thrown += 1
self.checknext()
self.changed += 1
[docs]  def flipbit(self,no=0):
"""Flip bit no in the current block ignoring all zeros.
The next coefficient counter is rewound to the start of the block."""
for i in range(no): self._step()
if self[self.next] == 0:
print "[flipbit] %i/%i (%i)" % (self.next, len(self), self[self.next])
raise CapacityException, "Undetected Capacity Violation"
assert self[self.next] != 0, "Undetected Capacity Violation"
self.flipnext()      # Flip this bit
self._blockRewind()   # Return again to the start of the block

# Override checknext() to ensure that the right coefficients are used.
#
# ::

[docs]  def checknext( self ):
"""Check that the next coefficient can be used.
Move pointer if necessary."""
if not self.morebits: return False
while self[self.next] == 0:
if not self._auxstep(): return False
return self.morebits

# Auxiliary Embedding Methods
# ---------------------------
#
# .. method:: f4msgembed_aux(M)
#
# ::

[docs]  def f4msgembed_aux( self, M ): # Embed message (no status block)
"""Embed a message using F4 without any status word or length
indicator.  (Auxiliary function.)"""
b = M.getbit()
while b != None:
if not self.morebits:
raise CapacityException, "Capacity exceeded"
self.embedbit(b)
b = M.getbit()
return self.embeddingReport()

# .. method:: f5embedblock(msg,k=None)
#
#    embeds a single code block using F5.
#
# ::

[docs]  def f5embedblock(self,msg,k=None):
"""
Embed a single F5 block (representing k message bits)
at the current position.
"""

# Firstly, let's check that we have at least one coefficient
# available for embedding.
#
# ::

if not self.morebits:
s = "Capacity violated at the start of a new F5 block"
print "[f4embedblock]", s
raise CapacityException, s

# The principle of the embedding is to make a test extraction.
# If the result B is 0, no change is necessary.  If it is non-zero,
# it indicates the position which needs to be flipped.
#
# ::

B = self.f5extract(k) ^ msg
while B != 0:
self.flipbit(B-1)
B = self.f5extract(k) ^ msg

# Once the embedding is successful, we can proceed to the next block.
# The :func:`_blockProceed` method updates all the pointers.
# Finally, we update the count of bits embedded.
#
# ::

self._blockProceed()
self.embedded += k
return

# .. method:: f5msgembed( M, k=None )
#
#    embeds a complete message using F5.  Clearly, if k=1, this equates
#    to F4.
#
# ::

[docs]  def f5msgembed( self, M, k=None ):
"""
Embed a complete message including status block,  using F5 block.
If k=None, an optimal code is chosen based on estimated capacity.
"""
L = M.length()

self.attempted = L*8+32

# If k is None, we try to calculate the optimal code based on the
# message length and estimated capacity::

if k == None:
(n,k) = f5param(self.expected,L+4)
if k == 0:
print "Warning: Expected capacity is insufficient."
print "Using a [1,1] code and trying anyway."
k = 1

# If k is given and k>0, we use that as the code dimension.
# If k=0, we embed as F4, but k=0 will be stored in the status word::

elif k == 0: n = 0
else: n = getn(k)
self._code = (n,k)

# The Status word stores the message length and the code dimension::

S = L | (k<<24)     # Make status word
print "[f5msgembed] Status block:", S
self.embed(S,32)  # Embed status word
self._blockProceed()

# If k=1 or k=1, we effectively use F4::

if k <= 1: return self.f4msgembed_aux(M) # Embed message

# If k>1, we use :func:`f5embedblock` and loop until the entire
# message has been embedded::

print "Code parameters:", (n,k)
b = M.getbits(k)
while b != None:
self.f5embedblock(b,k)
b = M.getbits(k)
return self.embeddingReport()

# Diagnostics
# -----------
#
# ::

[docs]  def embeddingReport(self): # Diagnostics output
"""Print and return quantitative measures of the embedding
performance."""
print "Used %i of %i coefficients." \
% (self.next,self.length())
print "Embedded %i bits." % (self.embedded)
print "Flipped %i bits." % (self.changed)
print "Discarded (zeroed) %i coefficients." % (self.thrown)
nz = self.length() - self._zero
return {
"Length"    : self.length(),
"Used"      : self.next,
"Embedded"  : self.embedded,
"Attempted" : self.attempted,
"Flipped"   : self.changed,
"Zero" : self._zero,
"One"  : self._one,
"Two"  : self._two,
"Non-zero" : nz,
"BPC"      : float( self.embedded ) / self.length(),
"Code"     : self._code
}

# Main Interface Methods
# ----------------------
#
# Generic function identifiers for polymorphism
#
# ::

[docs]  def msgExtract( self, *a, **kw ):
"""Extract a message from the coefficients."""
return self.f5msgextract( *a, **kw )
[docs]  def msgEmbed( self, *a, **kw ):
"""Embed a message in the coefficients."""
return self.f5msgembed( *a, **kw )
```