bitfrost assertion checks were lost

Daniel Drake dsd at laptop.org
Tue Jul 6 14:27:24 EDT 2010


A few months ago we enabled Python optimizations in the OLPC builds
causing python's "assert" statement to do nothing.
Martin Langhoff pointed out that we're dropping some important parts
of bitfrost's code with this change - we do want to be making these
checks.
Here's a patch to restore the previous behaviour, review appreciated.
-------------- next part --------------
diff --git a/bitfrost/__init__.py b/bitfrost/__init__.py
index e69de29..d3e691f 100644
--- a/bitfrost/__init__.py
+++ b/bitfrost/__init__.py
@@ -0,0 +1,10 @@
+def _assert(expr):
+    """
+    When compiled with -O, assertions are dropped. However, bitfrost uses
+    assertions in various places as part of the security-sensitive code flow.
+    All assertions have been switched to use this function so that the previous
+    "unoptimized" behaviour is kept.
+    """
+    if not expr:
+        raise AssertionError
+
diff --git a/bitfrost/leases/core.py b/bitfrost/leases/core.py
index e8fbbcb..3818bd6 100644
--- a/bitfrost/leases/core.py
+++ b/bitfrost/leases/core.py
@@ -29,6 +29,7 @@ http://wiki.laptop.org/go/Firmware_Key_and_Signature_Formats#Antitheft.2FActivat
 """
 
 import bitfrost.util.json as json
+from bitfrost import _assert
 from bitfrost.leases.crypto import verify_lease
 from bitfrost.leases.errors import *
 from bitfrost.leases.keys import LEASE_KEYS
@@ -50,13 +51,13 @@ def find_lease (this_sn, this_uuid, lease_contents):
         # perhaps this is a singleton lease
         version, leases = 1, { this_sn: lease_contents }
     try:
-        assert isinstance(version, int)
+        _assert(isinstance(version, int))
         if version != 1:
             # version of the multiple-lease marshalling, *not* the version
             # of the lease itself.
             raise UnsupportedLeaseFormat(version)
-        assert isinstance(leases, dict)
-        assert len(leases.keys()) > 0
+        _assert(isinstance(leases, dict))
+        _assert(len(leases.keys()) > 0)
     except:
         raise InvalidLeaseData()
 
diff --git a/bitfrost/leases/crypto.py b/bitfrost/leases/crypto.py
index de38f50..f28434e 100644
--- a/bitfrost/leases/crypto.py
+++ b/bitfrost/leases/crypto.py
@@ -26,6 +26,7 @@ for use when checking security tokens.
 """
 
 import time, re, sys
+from bitfrost import _assert
 from bitfrost.leases.errors import *
 import bitfrost.util.pyverify as pyverify # libtomcrypt binding
     
@@ -45,8 +46,8 @@ def _find_matching_key(keyid, valid_keys):
             # key01: data\n
             #  3 2 2  N  1
             try:
-                assert k[0:7] == 'key01: '
-                assert k[-1] == '\n'
+                _assert(k[0:7] == 'key01: ')
+                _assert(k[-1] == '\n')
                 key = k[7:-1].decode('hex')
             except:
                 pass # our keylist has an invalid key =(
@@ -77,7 +78,7 @@ def date_cmp(a, b):
 
     """
     a, b = str(a), str(b)
-    assert len(a) == 16 and len(b) == 16
+    _assert(len(a) == 16 and len(b) == 16)
     if a == b: return 0
     # special-case infinity.
     if a == "00000000T000000Z": return 1
@@ -108,28 +109,28 @@ def check_expiration_func():
 def _verify_sig01(certified_data, sig, valid_keys, __):
     """Decode and verify a signature in the sig01 format."""
     try:
-        assert isinstance(sig, str)
-        assert len(sig) > (3+2+2+6+1+64+1+1)
+        _assert(isinstance(sig, str))
+        _assert(len(sig) > (3+2+2+6+1+64+1+1))
         # minus 38
-        assert sig[0:7] == 'sig01: '
+        _assert(sig[0:7] == 'sig01: ')
         hashname = sig[7:13] 
-        assert hashname == 'sha256'
-        assert sig[13] == ' '
+        _assert(hashname == 'sha256')
+        _assert(sig[13] == ' ')
         keyid = sig[14:78]
-        assert len(keyid) == 64
-        assert sig[78] == ' '
-        assert sig[-1] == '\n'
+        _assert(len(keyid) == 64)
+        _assert(sig[78] == ' ')
+        _assert(sig[-1] == '\n')
         sig = sig[79:-1].decode('hex')
     except:
         raise InvalidSignatureData()
     
     # find a matching key in valid_keys, which will be a parsed key list.
     key = _find_matching_key(keyid, valid_keys)
-    assert key is not None
+    _assert(key is not None)
 
     # Crypto check
     try:
-        assert hashname == 'sha256'
+        _assert(hashname == 'sha256')
         pyverify.verify_buffer(key, certified_data, sig)
     except:
         raise VerificationFailure()
@@ -139,13 +140,13 @@ def _verify_sig01(certified_data, sig, valid_keys, __):
 def _verify_sig02(certified_data, sig, valid_keys, sn):
     """Decode and verify a signature in the sig01 format."""
     try:
-        assert isinstance(sig, str)
-        assert len(sig) > (3+2+2+6+1+64+1+16+1+1)
-        assert sig[0:7] == 'sig02: '
-        assert sig[-1] == '\n'
+        _assert(isinstance(sig, str))
+        _assert(len(sig) > (3+2+2+6+1+64+1+16+1+1))
+        _assert(sig[0:7] == 'sig02: ')
+        _assert(sig[-1] == '\n')
         sgs = sig[7:].split()
-        assert len(sgs) > 0
-        assert (len(sgs) % 4) == 0
+        _assert(len(sgs) > 0)
+        _assert((len(sgs) % 4) == 0)
     except:
         raise InvalidSignatureData()
 
@@ -184,7 +185,7 @@ def _verify_sig02(certified_data, sig, valid_keys, sn):
             data += ('key01: %s\n' % next_key)
         # okay, validate signature.
         try:
-            assert hashname == 'sha256'
+            _assert(hashname == 'sha256')
             pyverify.verify_buffer(key, data, sg_sig.decode('hex'))
         except:
             raise VerificationFailure()
@@ -281,10 +282,10 @@ def verify_sig(certified_data, sig, valid_keys, sn):
     
     """
     try:
-        assert isinstance(sig, str)
-        assert len(sig) > 6
-        assert sig[0:3] == 'sig'
-        assert sig[5] == ':'
+        _assert(isinstance(sig, str))
+        _assert(len(sig) > 6)
+        _assert(sig[0:3] == 'sig')
+        _assert(sig[5] == ':')
         version = int(sig[3:5])
     except:
         raise InvalidSignatureData()
@@ -296,17 +297,17 @@ def verify_sig(certified_data, sig, valid_keys, sn):
     raise InvalidSignatureData() # unknown signature version
 
 def parse_act01(lease):
-    assert isinstance(lease, str)
-    assert len(lease) > (3+2+2+11+3+16+1+ 3+2+2+1)
-    assert lease[0:7] == 'act01: '
+    _assert(isinstance(lease, str))
+    _assert(len(lease) > (3+2+2+11+3+16+1+ 3+2+2+1))
+    _assert(lease[0:7] == 'act01: ')
     sn = lease[7:18]
-    assert lease[18] == ' '
+    _assert(lease[18] == ' ')
     disposition = lease[19]
-    assert lease[20] == ' '
+    _assert(lease[20] == ' ')
     expiration = lease[21:37]
-    assert lease[37:41] == ' sig'
+    _assert(lease[37:41] == ' sig')
     sig = lease[38:]
-    assert lease[-1] == '\n'
+    _assert(lease[-1] == '\n')
     return sn, disposition, expiration, sig
 
 def _verify_act01(sn, uuid, lease, valid_keys):
@@ -327,7 +328,7 @@ def _verify_act01(sn, uuid, lease, valid_keys):
     #  3 2 2      11      111     16   1 3 2 2    6  1  64 1  N  1
     try:
         lease_sn, disposition, expiration, sig = parse_act01(lease)
-        assert lease_sn == str(sn)
+        _assert(lease_sn == str(sn))
     except:
         raise InvalidLeaseData()
 
@@ -421,10 +422,10 @@ def verify_act(sn, uuid, lease, valid_keys):
     LeaseExpired
     """
     try:
-        assert isinstance(lease, str)
-        assert len(lease) > 6
-        assert lease[0:3] == 'act'
-        assert lease[5] == ':'
+        _assert(isinstance(lease, str))
+        _assert(len(lease) > 6)
+        _assert(lease[0:3] == 'act')
+        _assert(lease[5] == ':')
         version = int(lease[3:5])
     except:
         raise InvalidLeaseData()
@@ -450,19 +451,19 @@ def _verify_dev01(sn, uuid, devkey, valid_keys):
     # dev01: serial-number d expiration sig01: sha256 keyid data\n
     #  3 2 2      11      111     16   1 3 2 2   6   1  64 1  N  1
     try:
-        assert isinstance(devkey, str)
-        assert len(devkey) > (3+2+2+11+3+16+1+ 3+2+2+1)
-        assert devkey[0:7] == 'dev01: '
-        assert devkey[7:18] == str(sn)
-        assert devkey[18] == ' '
+        _assert(isinstance(devkey, str))
+        _assert(len(devkey) > (3+2+2+11+3+16+1+ 3+2+2+1))
+        _assert(devkey[0:7] == 'dev01: ')
+        _assert(devkey[7:18] == str(sn))
+        _assert(devkey[18] == ' ')
         disposition = devkey[19]
-        assert disposition == 'A'
-        assert devkey[20] == ' '
+        _assert(disposition == 'A')
+        _assert(devkey[20] == ' ')
         expiration = devkey[21:37]
-        assert expiration == '00000000T000000Z'
-        assert devkey[37:41] == ' sig'
+        _assert(expiration == '00000000T000000Z')
+        _assert(devkey[37:41] == ' sig')
         sig = devkey[38:]
-        assert devkey[-1] == '\n'
+        _assert(devkey[-1] == '\n')
     except:
         raise InvalidDevKeyData()
 
@@ -552,11 +553,11 @@ def verify_dev(sn, uuid, devkey, valid_keys):
     LeaseExpired
     """
     try:
-        assert isinstance(devkey, str)
-        assert len(devkey) > 6
-        assert devkey[0:3] == 'dev'
-        assert devkey[5] == ':'
-        version = int(devkey[3:5])
+        _assert(isinstance(devkey, str))
+        _assert(len(devkey) > 6)
+        _assert(devkey[0:3] == 'dev')
+        _assert(devkey[5] == ':')
+        version = int(devkey[3:5]))
     except:
         raise InvalidDevKeyData()
 
diff --git a/bitfrost/update/irsync.py b/bitfrost/update/irsync.py
index d7628a4..102b274 100644
--- a/bitfrost/update/irsync.py
+++ b/bitfrost/update/irsync.py
@@ -7,6 +7,7 @@ directories which are out-of-date."""
 from __future__ import with_statement
 import os, os.path, shutil, stat, sys
 from bitfrost.contents.utils import mkhashes, open_envel, UnifiedContents
+from bitfrost import _assert
 from subprocess import check_call
 HASH, MODE, UID, GID, XATTR = 'h', 'm', 'u#', 'g#', 'x'
 
@@ -25,7 +26,7 @@ def rsync (src, dst, path, recursive, verbosity=0, link_dest=None):
         link_path = os.path.join(link_dest, path)
         # rsync will complain if we give it a link-dest which doesn't actually
         # exist, isn't a directory, or if the link-dest is relative.
-        assert os.path.isabs(link_path)
+        _assert(os.path.isabs(link_path))
         if os.path.isdir(link_path):
             args[2:2] = [ '--link-dest', link_path ]
     if verbosity > 1: args[2:2] = ['-v']
diff --git a/bitfrost/update/microformat.py b/bitfrost/update/microformat.py
index b3806b2..5b4d50c 100644
--- a/bitfrost/update/microformat.py
+++ b/bitfrost/update/microformat.py
@@ -64,6 +64,8 @@ from __future__ import division
 from HTMLParser import HTMLParser, HTMLParseError
 import urlparse
 
+from bitfrost import _assert
+
 _DEBUG_PARSER = False
 """Check that the HTML input is well-formed.
 Otherwise, silently "do our best" with what we're given."""
@@ -154,9 +156,9 @@ class _UpdateHTMLParser(HTMLParser):
             self.in_activity -= 1
             if self.in_activity == 0:
                 if _DEBUG_PARSER:
-                    assert self.in_activity_id == 0
-                    assert self.in_activity_version == 0
-                    assert self.in_activity_url == 0
+                    _assert(self.in_activity_id == 0)
+                    _assert(self.in_activity_version == 0)
+                    _assert(self.in_activity_url == 0)
                 if self.last_id is not None and self.last_id.strip() == '':
                     self.last_id = None
                 if len(self.last_urls)>0 and (self.last_id is None or
diff --git a/bitfrost/update/setup.py b/bitfrost/update/setup.py
index f8f4b05..9800a14 100644
--- a/bitfrost/update/setup.py
+++ b/bitfrost/update/setup.py
@@ -9,6 +9,7 @@ from subprocess import call, check_call, Popen, PIPE
 import os, os.path, re, struct, sys, threading, time
 from bitfrost.leases.crypto import verify_sig, verify_dev, check_expiration_func
 from bitfrost.leases.keys import OS_KEYS, DEVELOPER_KEYS
+from bitfrost import _assert
 
 def _is_partitioned():
     # see if / is a different device from /bootpart
@@ -210,38 +211,38 @@ def clean_versions(report, partitioned, cur_hash, new_hash):
         keep.extend(os.listdir('/versions/sticky'))
     except: pass # if /versions/sticky doesn't exist, don't worry about it.
     for f in os.listdir('/versions/pristine'):
-        assert len(f) > 0 and '.' not in f # defensive programming.
+        _assert(len(f) > 0 and '.' not in f) # defensive programming.
         if f in keep: continue
         report(0, 'Deleting old pristine version %s' % f)
         rmrf('/versions/pristine/%s' % f, ignore_errors=True)
     for f in os.listdir('/versions/run'):
-        assert len(f) > 0 and '.' not in f # defensive programming.
+        _assert(len(f) > 0 and '.' not in f) # defensive programming.
         if f in keep: continue
         report(0, 'Deleting old run version %s' % f)
         rmrf('/versions/run/%s' % f, ignore_errors=True)
     for f in os.listdir('/versions/contents'):
-        assert len(f) > 0 and '.' not in f # defensive programming.
+        _assert(len(f) > 0 and '.' not in f) # defensive programming.
         if f in keep: continue
         report(0, 'Deleting old contents for version %s' % f)
         rmrf('/versions/contents/%s' % f, ignore_errors=True)
     # 3b. Delete corresponding members of /boot-versions if using a boot partition
     if partitioned:
         for f in os.listdir('/bootpart/boot-versions'):
-            assert len(f) > 0 and '.' not in f
+            _assert(len(f) > 0 and '.' not in f)
             if f in keep: continue
             report(0, 'Deleting old boot version %s' % f)
             rmrf('/bootpart/boot-versions/%s' % f, ignore_errors=True)
     else:
         # 4. Clean up old configs.
         for f in os.listdir('/versions/configs'):
-            assert len(f) > 0 and not f.startswith('.') # defensive programming.
+            _assert(len(f) > 0 and not f.startswith('.')) # defensive programming.
             if f == os.path.basename(os.readlink('/versions/boot').rstrip('/')):
                 continue
             report(0, 'Deleting old boot configuration %s' % f)
             rmrf('/versions/configs/%s' % f, ignore_errors=True)
     # 5. Clean up updates directory.
     for f in os.listdir('/versions/updates'):
-        assert len(f) > 0 and not f.startswith('.') # defensive programming.
+        _assert(len(f) > 0 and not f.startswith('.')) # defensive programming.
         if f.startswith(shorten_hash(new_hash)): continue
         report(0, 'Deleting old incomplete update %s' % f)
         rmrf('/versions/updates/%s' % f, ignore_errors=True)
diff --git a/bitfrost/util/json.py b/bitfrost/util/json.py
index 87e1065..c60c6f8 100644
--- a/bitfrost/util/json.py
+++ b/bitfrost/util/json.py
@@ -265,7 +265,7 @@ if __name__ == '__main__':
         except WriteException:
             pass # this is what should happen
     def expectWrite(obj, expect_str):
-        assert write(obj) == expect_str
+        _assert(write(obj) == expect_str)
     def expectRead(s, result):
         expectWrite(read(s), write(result))
 
diff --git a/bitfrost/util/urlrange.py b/bitfrost/util/urlrange.py
index 9ef56ad..a5fcbf1 100755
--- a/bitfrost/util/urlrange.py
+++ b/bitfrost/util/urlrange.py
@@ -20,6 +20,8 @@ import urllib2
 import urlparse
 import zipfile
 
+from bitfrost import _assert
+
 _cache = {}
 """Cache of temporary files used to download URLs."""
 _redirect_cache = {}
@@ -274,7 +276,7 @@ class _InnerWrapper(object):
             self._setup()
             size = self.length - self.offset
         self._setup()
-        assert self.offset <= self.length
+        _assert(self.offset <= self.length)
         if (self.offset + size) > self.length:
             size = self.length - self.offset # fixup overlong requests
         if size == 0: return [] # we're at the end of our rope.
@@ -299,7 +301,7 @@ class _InnerWrapper(object):
             self.close()
             raise IOError('could not read: %s' % res.reason)
         data = res.read(size)
-        assert len(data) == size
+        _assert(len(data) == size)
         self.offset += size
         res.close()
         return data
@@ -448,13 +450,13 @@ def _main():
     # test redirects
     url = 'http://etoys.laptop.org/xo'
     if True:
-        assert urlopen(url).url == (url+'/')
+        _assert(urlopen(url).url == (url+'/'))
         # check that cached copy is stored with the canonical url
         saved = urlretrieve(url)
         print saved
         os.unlink(saved)
-        assert url in _redirect_cache
-        assert _redirect_cache[url] in _cache
+        _assert(url in _redirect_cache)
+        _assert(_redirect_cache[url] in _cache)
 
     # test the progress callbacks.
     import sys


More information about the Devel mailing list