bitfrost assertion checks were lost
Daniel Drake
dsd at laptop.org
Tue Jul 6 14:27:24 EDT 2010
A few months ago we enabled Python optimizations in the OLPC builds
causing python's "assert" statement to do nothing.
Martin Langhoff pointed out that we're dropping some important parts
of bitfrost's code with this change - we do want to be making these
checks.
Here's a patch to restore the previous behaviour, review appreciated.
-------------- next part --------------
diff --git a/bitfrost/__init__.py b/bitfrost/__init__.py
index e69de29..d3e691f 100644
--- a/bitfrost/__init__.py
+++ b/bitfrost/__init__.py
@@ -0,0 +1,10 @@
+def _assert(expr):
+ """
+ When compiled with -O, assertions are dropped. However, bitfrost uses
+ assertions in various places as part of the security-sensitive code flow.
+ All assertions have been switched to use this function so that the previous
+ "unoptimized" behaviour is kept.
+ """
+ if not expr:
+ raise AssertionError
+
diff --git a/bitfrost/leases/core.py b/bitfrost/leases/core.py
index e8fbbcb..3818bd6 100644
--- a/bitfrost/leases/core.py
+++ b/bitfrost/leases/core.py
@@ -29,6 +29,7 @@ http://wiki.laptop.org/go/Firmware_Key_and_Signature_Formats#Antitheft.2FActivat
"""
import bitfrost.util.json as json
+from bitfrost import _assert
from bitfrost.leases.crypto import verify_lease
from bitfrost.leases.errors import *
from bitfrost.leases.keys import LEASE_KEYS
@@ -50,13 +51,13 @@ def find_lease (this_sn, this_uuid, lease_contents):
# perhaps this is a singleton lease
version, leases = 1, { this_sn: lease_contents }
try:
- assert isinstance(version, int)
+ _assert(isinstance(version, int))
if version != 1:
# version of the multiple-lease marshalling, *not* the version
# of the lease itself.
raise UnsupportedLeaseFormat(version)
- assert isinstance(leases, dict)
- assert len(leases.keys()) > 0
+ _assert(isinstance(leases, dict))
+ _assert(len(leases.keys()) > 0)
except:
raise InvalidLeaseData()
diff --git a/bitfrost/leases/crypto.py b/bitfrost/leases/crypto.py
index de38f50..f28434e 100644
--- a/bitfrost/leases/crypto.py
+++ b/bitfrost/leases/crypto.py
@@ -26,6 +26,7 @@ for use when checking security tokens.
"""
import time, re, sys
+from bitfrost import _assert
from bitfrost.leases.errors import *
import bitfrost.util.pyverify as pyverify # libtomcrypt binding
@@ -45,8 +46,8 @@ def _find_matching_key(keyid, valid_keys):
# key01: data\n
# 3 2 2 N 1
try:
- assert k[0:7] == 'key01: '
- assert k[-1] == '\n'
+ _assert(k[0:7] == 'key01: ')
+ _assert(k[-1] == '\n')
key = k[7:-1].decode('hex')
except:
pass # our keylist has an invalid key =(
@@ -77,7 +78,7 @@ def date_cmp(a, b):
"""
a, b = str(a), str(b)
- assert len(a) == 16 and len(b) == 16
+ _assert(len(a) == 16 and len(b) == 16)
if a == b: return 0
# special-case infinity.
if a == "00000000T000000Z": return 1
@@ -108,28 +109,28 @@ def check_expiration_func():
def _verify_sig01(certified_data, sig, valid_keys, __):
"""Decode and verify a signature in the sig01 format."""
try:
- assert isinstance(sig, str)
- assert len(sig) > (3+2+2+6+1+64+1+1)
+ _assert(isinstance(sig, str))
+ _assert(len(sig) > (3+2+2+6+1+64+1+1))
# minus 38
- assert sig[0:7] == 'sig01: '
+ _assert(sig[0:7] == 'sig01: ')
hashname = sig[7:13]
- assert hashname == 'sha256'
- assert sig[13] == ' '
+ _assert(hashname == 'sha256')
+ _assert(sig[13] == ' ')
keyid = sig[14:78]
- assert len(keyid) == 64
- assert sig[78] == ' '
- assert sig[-1] == '\n'
+ _assert(len(keyid) == 64)
+ _assert(sig[78] == ' ')
+ _assert(sig[-1] == '\n')
sig = sig[79:-1].decode('hex')
except:
raise InvalidSignatureData()
# find a matching key in valid_keys, which will be a parsed key list.
key = _find_matching_key(keyid, valid_keys)
- assert key is not None
+ _assert(key is not None)
# Crypto check
try:
- assert hashname == 'sha256'
+ _assert(hashname == 'sha256')
pyverify.verify_buffer(key, certified_data, sig)
except:
raise VerificationFailure()
@@ -139,13 +140,13 @@ def _verify_sig01(certified_data, sig, valid_keys, __):
def _verify_sig02(certified_data, sig, valid_keys, sn):
"""Decode and verify a signature in the sig01 format."""
try:
- assert isinstance(sig, str)
- assert len(sig) > (3+2+2+6+1+64+1+16+1+1)
- assert sig[0:7] == 'sig02: '
- assert sig[-1] == '\n'
+ _assert(isinstance(sig, str))
+ _assert(len(sig) > (3+2+2+6+1+64+1+16+1+1))
+ _assert(sig[0:7] == 'sig02: ')
+ _assert(sig[-1] == '\n')
sgs = sig[7:].split()
- assert len(sgs) > 0
- assert (len(sgs) % 4) == 0
+ _assert(len(sgs) > 0)
+ _assert((len(sgs) % 4) == 0)
except:
raise InvalidSignatureData()
@@ -184,7 +185,7 @@ def _verify_sig02(certified_data, sig, valid_keys, sn):
data += ('key01: %s\n' % next_key)
# okay, validate signature.
try:
- assert hashname == 'sha256'
+ _assert(hashname == 'sha256')
pyverify.verify_buffer(key, data, sg_sig.decode('hex'))
except:
raise VerificationFailure()
@@ -281,10 +282,10 @@ def verify_sig(certified_data, sig, valid_keys, sn):
"""
try:
- assert isinstance(sig, str)
- assert len(sig) > 6
- assert sig[0:3] == 'sig'
- assert sig[5] == ':'
+ _assert(isinstance(sig, str))
+ _assert(len(sig) > 6)
+ _assert(sig[0:3] == 'sig')
+ _assert(sig[5] == ':')
version = int(sig[3:5])
except:
raise InvalidSignatureData()
@@ -296,17 +297,17 @@ def verify_sig(certified_data, sig, valid_keys, sn):
raise InvalidSignatureData() # unknown signature version
def parse_act01(lease):
- assert isinstance(lease, str)
- assert len(lease) > (3+2+2+11+3+16+1+ 3+2+2+1)
- assert lease[0:7] == 'act01: '
+ _assert(isinstance(lease, str))
+ _assert(len(lease) > (3+2+2+11+3+16+1+ 3+2+2+1))
+ _assert(lease[0:7] == 'act01: ')
sn = lease[7:18]
- assert lease[18] == ' '
+ _assert(lease[18] == ' ')
disposition = lease[19]
- assert lease[20] == ' '
+ _assert(lease[20] == ' ')
expiration = lease[21:37]
- assert lease[37:41] == ' sig'
+ _assert(lease[37:41] == ' sig')
sig = lease[38:]
- assert lease[-1] == '\n'
+ _assert(lease[-1] == '\n')
return sn, disposition, expiration, sig
def _verify_act01(sn, uuid, lease, valid_keys):
@@ -327,7 +328,7 @@ def _verify_act01(sn, uuid, lease, valid_keys):
# 3 2 2 11 111 16 1 3 2 2 6 1 64 1 N 1
try:
lease_sn, disposition, expiration, sig = parse_act01(lease)
- assert lease_sn == str(sn)
+ _assert(lease_sn == str(sn))
except:
raise InvalidLeaseData()
@@ -421,10 +422,10 @@ def verify_act(sn, uuid, lease, valid_keys):
LeaseExpired
"""
try:
- assert isinstance(lease, str)
- assert len(lease) > 6
- assert lease[0:3] == 'act'
- assert lease[5] == ':'
+ _assert(isinstance(lease, str))
+ _assert(len(lease) > 6)
+ _assert(lease[0:3] == 'act')
+ _assert(lease[5] == ':')
version = int(lease[3:5])
except:
raise InvalidLeaseData()
@@ -450,19 +451,19 @@ def _verify_dev01(sn, uuid, devkey, valid_keys):
# dev01: serial-number d expiration sig01: sha256 keyid data\n
# 3 2 2 11 111 16 1 3 2 2 6 1 64 1 N 1
try:
- assert isinstance(devkey, str)
- assert len(devkey) > (3+2+2+11+3+16+1+ 3+2+2+1)
- assert devkey[0:7] == 'dev01: '
- assert devkey[7:18] == str(sn)
- assert devkey[18] == ' '
+ _assert(isinstance(devkey, str))
+ _assert(len(devkey) > (3+2+2+11+3+16+1+ 3+2+2+1))
+ _assert(devkey[0:7] == 'dev01: ')
+ _assert(devkey[7:18] == str(sn))
+ _assert(devkey[18] == ' ')
disposition = devkey[19]
- assert disposition == 'A'
- assert devkey[20] == ' '
+ _assert(disposition == 'A')
+ _assert(devkey[20] == ' ')
expiration = devkey[21:37]
- assert expiration == '00000000T000000Z'
- assert devkey[37:41] == ' sig'
+ _assert(expiration == '00000000T000000Z')
+ _assert(devkey[37:41] == ' sig')
sig = devkey[38:]
- assert devkey[-1] == '\n'
+ _assert(devkey[-1] == '\n')
except:
raise InvalidDevKeyData()
@@ -552,11 +553,11 @@ def verify_dev(sn, uuid, devkey, valid_keys):
LeaseExpired
"""
try:
- assert isinstance(devkey, str)
- assert len(devkey) > 6
- assert devkey[0:3] == 'dev'
- assert devkey[5] == ':'
- version = int(devkey[3:5])
+ _assert(isinstance(devkey, str))
+ _assert(len(devkey) > 6)
+ _assert(devkey[0:3] == 'dev')
+ _assert(devkey[5] == ':')
+ version = int(devkey[3:5]))
except:
raise InvalidDevKeyData()
diff --git a/bitfrost/update/irsync.py b/bitfrost/update/irsync.py
index d7628a4..102b274 100644
--- a/bitfrost/update/irsync.py
+++ b/bitfrost/update/irsync.py
@@ -7,6 +7,7 @@ directories which are out-of-date."""
from __future__ import with_statement
import os, os.path, shutil, stat, sys
from bitfrost.contents.utils import mkhashes, open_envel, UnifiedContents
+from bitfrost import _assert
from subprocess import check_call
HASH, MODE, UID, GID, XATTR = 'h', 'm', 'u#', 'g#', 'x'
@@ -25,7 +26,7 @@ def rsync (src, dst, path, recursive, verbosity=0, link_dest=None):
link_path = os.path.join(link_dest, path)
# rsync will complain if we give it a link-dest which doesn't actually
# exist, isn't a directory, or if the link-dest is relative.
- assert os.path.isabs(link_path)
+ _assert(os.path.isabs(link_path))
if os.path.isdir(link_path):
args[2:2] = [ '--link-dest', link_path ]
if verbosity > 1: args[2:2] = ['-v']
diff --git a/bitfrost/update/microformat.py b/bitfrost/update/microformat.py
index b3806b2..5b4d50c 100644
--- a/bitfrost/update/microformat.py
+++ b/bitfrost/update/microformat.py
@@ -64,6 +64,8 @@ from __future__ import division
from HTMLParser import HTMLParser, HTMLParseError
import urlparse
+from bitfrost import _assert
+
_DEBUG_PARSER = False
"""Check that the HTML input is well-formed.
Otherwise, silently "do our best" with what we're given."""
@@ -154,9 +156,9 @@ class _UpdateHTMLParser(HTMLParser):
self.in_activity -= 1
if self.in_activity == 0:
if _DEBUG_PARSER:
- assert self.in_activity_id == 0
- assert self.in_activity_version == 0
- assert self.in_activity_url == 0
+ _assert(self.in_activity_id == 0)
+ _assert(self.in_activity_version == 0)
+ _assert(self.in_activity_url == 0)
if self.last_id is not None and self.last_id.strip() == '':
self.last_id = None
if len(self.last_urls)>0 and (self.last_id is None or
diff --git a/bitfrost/update/setup.py b/bitfrost/update/setup.py
index f8f4b05..9800a14 100644
--- a/bitfrost/update/setup.py
+++ b/bitfrost/update/setup.py
@@ -9,6 +9,7 @@ from subprocess import call, check_call, Popen, PIPE
import os, os.path, re, struct, sys, threading, time
from bitfrost.leases.crypto import verify_sig, verify_dev, check_expiration_func
from bitfrost.leases.keys import OS_KEYS, DEVELOPER_KEYS
+from bitfrost import _assert
def _is_partitioned():
# see if / is a different device from /bootpart
@@ -210,38 +211,38 @@ def clean_versions(report, partitioned, cur_hash, new_hash):
keep.extend(os.listdir('/versions/sticky'))
except: pass # if /versions/sticky doesn't exist, don't worry about it.
for f in os.listdir('/versions/pristine'):
- assert len(f) > 0 and '.' not in f # defensive programming.
+ _assert(len(f) > 0 and '.' not in f) # defensive programming.
if f in keep: continue
report(0, 'Deleting old pristine version %s' % f)
rmrf('/versions/pristine/%s' % f, ignore_errors=True)
for f in os.listdir('/versions/run'):
- assert len(f) > 0 and '.' not in f # defensive programming.
+ _assert(len(f) > 0 and '.' not in f) # defensive programming.
if f in keep: continue
report(0, 'Deleting old run version %s' % f)
rmrf('/versions/run/%s' % f, ignore_errors=True)
for f in os.listdir('/versions/contents'):
- assert len(f) > 0 and '.' not in f # defensive programming.
+ _assert(len(f) > 0 and '.' not in f) # defensive programming.
if f in keep: continue
report(0, 'Deleting old contents for version %s' % f)
rmrf('/versions/contents/%s' % f, ignore_errors=True)
# 3b. Delete corresponding members of /boot-versions if using a boot partition
if partitioned:
for f in os.listdir('/bootpart/boot-versions'):
- assert len(f) > 0 and '.' not in f
+ _assert(len(f) > 0 and '.' not in f)
if f in keep: continue
report(0, 'Deleting old boot version %s' % f)
rmrf('/bootpart/boot-versions/%s' % f, ignore_errors=True)
else:
# 4. Clean up old configs.
for f in os.listdir('/versions/configs'):
- assert len(f) > 0 and not f.startswith('.') # defensive programming.
+ _assert(len(f) > 0 and not f.startswith('.')) # defensive programming.
if f == os.path.basename(os.readlink('/versions/boot').rstrip('/')):
continue
report(0, 'Deleting old boot configuration %s' % f)
rmrf('/versions/configs/%s' % f, ignore_errors=True)
# 5. Clean up updates directory.
for f in os.listdir('/versions/updates'):
- assert len(f) > 0 and not f.startswith('.') # defensive programming.
+ _assert(len(f) > 0 and not f.startswith('.')) # defensive programming.
if f.startswith(shorten_hash(new_hash)): continue
report(0, 'Deleting old incomplete update %s' % f)
rmrf('/versions/updates/%s' % f, ignore_errors=True)
diff --git a/bitfrost/util/json.py b/bitfrost/util/json.py
index 87e1065..c60c6f8 100644
--- a/bitfrost/util/json.py
+++ b/bitfrost/util/json.py
@@ -265,7 +265,7 @@ if __name__ == '__main__':
except WriteException:
pass # this is what should happen
def expectWrite(obj, expect_str):
- assert write(obj) == expect_str
+ _assert(write(obj) == expect_str)
def expectRead(s, result):
expectWrite(read(s), write(result))
diff --git a/bitfrost/util/urlrange.py b/bitfrost/util/urlrange.py
index 9ef56ad..a5fcbf1 100755
--- a/bitfrost/util/urlrange.py
+++ b/bitfrost/util/urlrange.py
@@ -20,6 +20,8 @@ import urllib2
import urlparse
import zipfile
+from bitfrost import _assert
+
_cache = {}
"""Cache of temporary files used to download URLs."""
_redirect_cache = {}
@@ -274,7 +276,7 @@ class _InnerWrapper(object):
self._setup()
size = self.length - self.offset
self._setup()
- assert self.offset <= self.length
+ _assert(self.offset <= self.length)
if (self.offset + size) > self.length:
size = self.length - self.offset # fixup overlong requests
if size == 0: return [] # we're at the end of our rope.
@@ -299,7 +301,7 @@ class _InnerWrapper(object):
self.close()
raise IOError('could not read: %s' % res.reason)
data = res.read(size)
- assert len(data) == size
+ _assert(len(data) == size)
self.offset += size
res.close()
return data
@@ -448,13 +450,13 @@ def _main():
# test redirects
url = 'http://etoys.laptop.org/xo'
if True:
- assert urlopen(url).url == (url+'/')
+ _assert(urlopen(url).url == (url+'/'))
# check that cached copy is stored with the canonical url
saved = urlretrieve(url)
print saved
os.unlink(saved)
- assert url in _redirect_cache
- assert _redirect_cache[url] in _cache
+ _assert(url in _redirect_cache)
+ _assert(_redirect_cache[url] in _cache)
# test the progress callbacks.
import sys
More information about the Devel
mailing list