## $Id: f4.py 1494 2009-04-30 13:10:41Z css1hs $
## -*- coding: utf-8 -*-
# ****************
# The F4 Algorithm
# ****************
#
#
# .. module:: pysteg.jsteg.f4
#
# :Module: pysteg.jsteg.f4
# :Date: $Date: 2010-07-16 10:43:38 +0100 (Fri, 16 Jul 2010) $
# :Revision: $Revision: 1559 $
# :Copyright: © 2009-10: University of Surrey, UK
# :Author: Hans Georg Schaathun <georg@schaathun.net> (2009-2010)
#
# ::
from random import SystemRandom
from .message import message
from stegbase import stegbase
from exceptions import *
from numpy import ceil
sysrnd = SystemRandom()
# Auxiliary functions
# ===================
#
# ::
[docs]def getn(k): return (1<<k) - 1
[docs]def f5decode(S,n,k=0):
"""Decode a block using an [n,k] Hamming code as the matrix code.
(k is actually ignored in the present version.)"""
R = 0
for i in range(n):
if ( S&(1<<i) > 0 ): R ^= (n-i)
return R
[docs]def f5param(exp,msglen):
"""Calculate code parameters for the matrix code. Returns a pair (n,k)."""
k = 7
for i in range(1,8):
U = f4usable(exp,i)
if ( U < msglen ):
k = i - 1
break
return ( (1<<k)-1 , k )
# .. function:: f4usable(exp,i)
#
# This is taken from Westfeld's java implementation of F5,
# and assumes that every other one is ignored.
#
# ::
[docs]def f4usable(exp,i):
"Estimate the number of usable coefficients for F4/F5."
n = (1<<i)-1 ;
e1 = exp*i/n
return ( e1 - e1%n ) / 8
# The :class:`f4coef` Class
# =========================
#
# .. class:: f4coef
#
# ::
[docs]class f4coef(stegbase):
"""
The F4 object handles a list of JPEG AC coefficients and
embeds and extracts messages using F4.
"""
# We view F4 as a special case of F5 and define code parameters
# already in this class::
_code = (0,0)
[docs] def codedim(self): return self._code[1]
[docs] def codelength(self): return self._code[0]
# .. method:: interpret(y)
#
# ::
[docs] def interpret(self,y):
"Decode a coefficient x F4 style. Returns None when x=0."
# We convert the input to integer for robustness.
# Why on Earth do we occasionally get float input?
#
# ::
x = int(y)
# The demodulation itself is straight forward.
#
# ::
if x == 0: return None
elif x > 0: return (x&1)
elif x < 0: return (1-(x&1))
# Capacity Analysis
# -----------------
#
# .. method:: embedAnalysis
#
# ::
[docs] def embedAnalysis(self):
"""Estimate the embedding capacity."""
A = map ( abs, self )
self._zero = _zero = self.count(0)
self._one = _one = A.count(1)
self._two = _two = A.count(2)
self._large = _large = len(self) - _zero - _one - _two/4
self.expected = _exp = _large + _one/3
print "[f4coef] Length:", len(self)
print "[f4coef] %i zeros; %i ones; %i twos" % (_zero,_one,_two)
print "Expected capacity: %i bits" % (_exp)
self.capacity = _exp
# For F5, we analyse capacity for each possible code::
if self.codedim() > 0:
self.f5capacity = {}
for i in range(1,8): self.f5capacity[i] = self.f5capacityreport(i)
[docs] def f5capacityreport(self,i):
"""Print a report on F5 capacity."""
n = (1<<i)-1 ;
U = f4usable(self.expected,i)
C = float ( self._large - self._large%(n+1) )
C += self._one + self._one/3 - self._one/(n+1)
C /= n+1
E = 1.0*U*8/C
print "(%i,%i) code: %i bytes (efficiency %f bits per change)" \
% (n,i,U,E)
return { "Length" : n, "Dim" : i, "Capacity" : U, "Efficiency" : E }
# Why do we update the capacity here?
#
# ::
self.capacity = _exp
# Auxiliary Extraction Methods
# ----------------------------
#
# .. method:: f5extract(k=None):
#
# ::
[docs] def getbit( self, pad=None ): # Extract single bit
"Extract a single bit using F4."
if self.next >= len(self):
self.morebits = False
return pad
assert self[self.next] != 0, "Undetected Capacity Violation"
R = self.interpret( self[self.next] )
self._step()
if self.next < len(self):
assert self[self.next] != 0, "Undetected Capacity Violation"
assert R != None
return R
# Update coefficients.
# This should be the only method modifying coefficients.
#
# ::
[docs] def flipnext(self):
"""Flip the message bit in the next position.
(Does not proceed to the subsequent bit.)"""
if self[self.next] == 0:
print "[flipnext] UnusedCoefficientError. self.next =", self.next
raise UnusedCoefficientError, "Attempting to flip a zero"
if self[self.next] < 0: self[self.next] += 1
elif self[self.next] > 0: self[self.next] -= 1
if self[self.next] == 0: self.thrown += 1
self.checknext()
self.changed += 1
[docs] def flipbit(self,no=0):
"""Flip bit no in the current block ignoring all zeros.
The next coefficient counter is rewound to the start of the block."""
self._blockRewind() # Return to traverse the block from start
for i in range(no): self._step()
if self[self.next] == 0:
print "[flipbit] %i/%i (%i)" % (self.next, len(self), self[self.next])
raise CapacityException, "Undetected Capacity Violation"
assert self[self.next] != 0, "Undetected Capacity Violation"
self.flipnext() # Flip this bit
self._blockRewind() # Return again to the start of the block
# Override checknext() to ensure that the right coefficients are used.
#
# ::
[docs] def checknext( self ):
"""Check that the next coefficient can be used.
Move pointer if necessary."""
if not self.morebits: return False
while self[self.next] == 0:
if not self._auxstep(): return False
return self.morebits
# Auxiliary Embedding Methods
# ---------------------------
#
# .. method:: f4msgembed_aux(M)
#
# ::
[docs] def f4msgembed_aux( self, M ): # Embed message (no status block)
"""Embed a message using F4 without any status word or length
indicator. (Auxiliary function.)"""
b = M.getbit()
while b != None:
if not self.morebits:
raise CapacityException, "Capacity exceeded"
self.embedbit(b)
b = M.getbit()
return self.embeddingReport()
# .. method:: f5embedblock(msg,k=None)
#
# embeds a single code block using F5.
#
# ::
[docs] def f5embedblock(self,msg,k=None):
"""
Embed a single F5 block (representing k message bits)
at the current position.
"""
# Firstly, let's check that we have at least one coefficient
# available for embedding.
#
# ::
if not self.morebits:
s = "Capacity violated at the start of a new F5 block"
print "[f4embedblock]", s
raise CapacityException, s
# The principle of the embedding is to make a test extraction.
# If the result B is 0, no change is necessary. If it is non-zero,
# it indicates the position which needs to be flipped.
#
# ::
B = self.f5extract(k) ^ msg
while B != 0:
self.flipbit(B-1)
B = self.f5extract(k) ^ msg
# Once the embedding is successful, we can proceed to the next block.
# The :func:`_blockProceed` method updates all the pointers.
# Finally, we update the count of bits embedded.
#
# ::
self._blockProceed()
self.embedded += k
return
# .. method:: f5msgembed( M, k=None )
#
# embeds a complete message using F5. Clearly, if k=1, this equates
# to F4.
#
# ::
[docs] def f5msgembed( self, M, k=None ):
"""
Embed a complete message including status block, using F5 block.
If k=None, an optimal code is chosen based on estimated capacity.
"""
L = M.length()
self.attempted = L*8+32
# If k is None, we try to calculate the optimal code based on the
# message length and estimated capacity::
if k == None:
(n,k) = f5param(self.expected,L+4)
if k == 0:
print "Warning: Expected capacity is insufficient."
print "Using a [1,1] code and trying anyway."
k = 1
# If k is given and k>0, we use that as the code dimension.
# If k=0, we embed as F4, but k=0 will be stored in the status word::
elif k == 0: n = 0
else: n = getn(k)
self._code = (n,k)
# The Status word stores the message length and the code dimension::
S = L | (k<<24) # Make status word
print "[f5msgembed] Status block:", S
self.embed(S,32) # Embed status word
self._blockProceed()
# If k=1 or k=1, we effectively use F4::
if k <= 1: return self.f4msgembed_aux(M) # Embed message
# If k>1, we use :func:`f5embedblock` and loop until the entire
# message has been embedded::
print "Code parameters:", (n,k)
b = M.getbits(k)
while b != None:
self.f5embedblock(b,k)
b = M.getbits(k)
return self.embeddingReport()
# Diagnostics
# -----------
#
# ::
[docs] def embeddingReport(self): # Diagnostics output
"""Print and return quantitative measures of the embedding
performance."""
print "Used %i of %i coefficients." \
% (self.next,self.length())
print "Embedded %i bits." % (self.embedded)
print "Flipped %i bits." % (self.changed)
print "Discarded (zeroed) %i coefficients." % (self.thrown)
nz = self.length() - self._zero
return {
"Length" : self.length(),
"Used" : self.next,
"Embedded" : self.embedded,
"Attempted" : self.attempted,
"Flipped" : self.changed,
"Discarded" : self.thrown,
"Zero" : self._zero,
"One" : self._one,
"Two" : self._two,
"Non-zero" : nz,
"BPC" : float( self.embedded ) / self.length(),
"Code" : self._code
}
# Main Interface Methods
# ----------------------
#
# Generic function identifiers for polymorphism
#
# ::
[docs] def msgEmbed( self, *a, **kw ):
"""Embed a message in the coefficients."""
return self.f5msgembed( *a, **kw )