Make protected header mandatory
This means that the common iat-verifier logic always call the protected
header checker in the verifier classes. In case of token compilation it
also includes the protected header returned by the verifier class in
the resulting token.
It is still possible for a verifier to make the protected header
optional by implementing _get_p_header and _parse_p_header accordingly.
Change-Id: Ib802e4e30c4c9d2c85addcb7311ab92da3962b99
Signed-off-by: Mate Toth-Pal <mate.toth-pal@arm.com>
diff --git a/iat-verifier/dev_scripts/generate-sample-iat.py b/iat-verifier/dev_scripts/generate-sample-iat.py
index 29bfa47..84985de 100755
--- a/iat-verifier/dev_scripts/generate-sample-iat.py
+++ b/iat-verifier/dev_scripts/generate-sample-iat.py
@@ -97,5 +97,5 @@
cose_alg=AttestationTokenVerifier.COSE_ALG_ES256,
configuration=None)
with open(outfile, 'wb') as wfh:
- convert_map_to_token(token_map, verifier, wfh, add_p_header=False,
+ convert_map_to_token(token_map, verifier, wfh,
name_as_key=False, parse_raw_value=False)
diff --git a/iat-verifier/iatverifier/attest_token_verifier.py b/iat-verifier/iatverifier/attest_token_verifier.py
index d442a0b..98b162f 100644
--- a/iat-verifier/iatverifier/attest_token_verifier.py
+++ b/iat-verifier/iatverifier/attest_token_verifier.py
@@ -128,8 +128,7 @@
def convert_map_to_token(self,
token_encoder,
token_map,
- *, add_p_header,
- name_as_key,
+ *,name_as_key,
parse_raw_value):
"""Encode a map in cbor format using the 'token_encoder'"""
# pylint: disable=unused-argument
@@ -138,7 +137,7 @@
value = self.__class__.parse_raw(value)
return token_encoder.encode(value)
- def parse_token(self, *, token, check_p_header, lower_case_key):
+ def parse_token(self, *, token, lower_case_key):
"""Parse a token into a map
This function is recursive for composite claims and for token verifiers.
@@ -321,7 +320,7 @@
else:
self._verify_dict(token_item.claim_type, None, token_item.value)
- def _parse_token_dict(self, *, entry_number, token, check_p_header, lower_case_key):
+ def _parse_token_dict(self, *, entry_number, token, lower_case_key):
claim_value = {}
if not isinstance(token, dict):
@@ -336,7 +335,6 @@
name = name.lower()
claim_value[name] = claim.parse_token(
token=val,
- check_p_header=check_p_header,
lower_case_key=lower_case_key)
except KeyError:
claim_value[key] = val
@@ -366,7 +364,7 @@
msg += f' at index {entry_number}'
self.verifier.warning(msg)
- def parse_token(self, *, token, check_p_header, lower_case_key):
+ def parse_token(self, *, token, lower_case_key):
"""This expects a raw token map as 'token'"""
if self.is_list:
@@ -377,19 +375,17 @@
for entry_number, entry in enumerate(token):
claim_value.append(self._parse_token_dict(
entry_number=entry_number,
- check_p_header=check_p_header,
token=entry,
lower_case_key=lower_case_key))
else:
claim_value = self._parse_token_dict(
entry_number=None,
- check_p_header=check_p_header,
token=token,
lower_case_key=lower_case_key)
return TokenItem(value=claim_value, claim_type=self)
- def _encode_dict(self, token_encoder, token_map, *, add_p_header, name_as_key, parse_raw_value):
+ def _encode_dict(self, token_encoder, token_map, *, name_as_key, parse_raw_value):
token_encoder.encode_length(_CBOR_MAJOR_TYPE_MAP, len(token_map))
if name_as_key:
claims = {claim.get_claim_name().lower():
@@ -404,7 +400,6 @@
claim.convert_map_to_token(
token_encoder,
val,
- add_p_header=add_p_header,
name_as_key=name_as_key,
parse_raw_value=parse_raw_value)
except KeyError:
@@ -419,8 +414,7 @@
self,
token_encoder,
token_map,
- *, add_p_header,
- name_as_key,
+ *, name_as_key,
parse_raw_value):
if self.is_list:
token_encoder.encode_length(_CBOR_MAJOR_TYPE_ARRAY, len(token_map))
@@ -428,14 +422,12 @@
self._encode_dict(
token_encoder,
item,
- add_p_header=add_p_header,
name_as_key=name_as_key,
parse_raw_value=parse_raw_value)
else:
self._encode_dict(
token_encoder,
token_map,
- add_p_header=add_p_header,
name_as_key=name_as_key,
parse_raw_value=parse_raw_value)
@@ -551,22 +543,22 @@
super().__init__(verifier=self, necessity=necessity)
- def _sign_token(self, token, add_p_header):
+ def _sign_token(self, token):
"""Signs a token"""
if self._get_method() == AttestationTokenVerifier.SIGN_METHOD_RAW:
return token
if self._get_method() == AttestationTokenVerifier.SIGN_METHOD_SIGN1:
- return self._sign_eat(token, add_p_header)
+ return self._sign_eat(token)
if self._get_method() == AttestationTokenVerifier.SIGN_METHOD_MAC0:
- return self._hmac_eat(token, add_p_header)
+ return self._hmac_eat(token)
err_msg = 'Unexpected method "{}"; must be one of: raw, sign, mac'
raise ValueError(err_msg.format(self.method))
- def _sign_eat(self, token, add_p_header):
+ def _sign_eat(self, token):
protected_header = CoseAttrs()
p_header=self._get_p_header()
key=self._get_signing_key()
- if add_p_header and p_header is not None and key:
+ if p_header is not None and key:
protected_header.update(p_header)
signed_msg = Sign1Message(p_header=protected_header)
signed_msg.payload = token
@@ -576,26 +568,25 @@
return signed_msg.encode()
- def _hmac_eat(self, token, add_p_header):
+ def _hmac_eat(self, token):
protected_header = CoseAttrs()
p_header=self._get_p_header()
key=self._get_signing_key()
- if add_p_header and p_header is not None and key:
+ if p_header is not None and key:
protected_header.update(p_header)
hmac_msg = Mac0Message(payload=token, key=key, p_header=protected_header)
hmac_msg.compute_auth_tag(alg=self.cose_alg)
return hmac_msg.encode()
- def _get_cose_sign1_payload(self, cose, *, check_p_header, verify_signature):
+ def _get_cose_sign1_payload(self, cose, *, verify_signature):
msg = Sign1Message.decode(cose)
if verify_signature:
key = self._get_signing_key()
- if check_p_header:
- try:
- self._parse_p_header(msg)
- except Exception as exc:
- self.error(f'Invalid Protected header: {exc}', exception=exc)
+ try:
+ self._parse_p_header(msg)
+ except Exception as exc:
+ self.error(f'Invalid Protected header: {exc}', exception=exc)
msg.key = key
msg.signature = msg.signers
try:
@@ -605,15 +596,14 @@
return msg.payload, msg.protected_header
- def _get_cose_mac0_payload(self, cose, *, check_p_header, verify_signature):
+ def _get_cose_mac0_payload(self, cose, *, verify_signature):
msg = Mac0Message.decode(cose)
if verify_signature:
key = self._get_signing_key()
- if check_p_header:
- try:
- self._parse_p_header(msg)
- except Exception as exc:
- self.error(f'Invalid Protected header: {exc}', exception=exc)
+ try:
+ self._parse_p_header(msg)
+ except Exception as exc:
+ self.error(f'Invalid Protected header: {exc}', exception=exc)
msg.key = key
try:
msg.verify_auth_tag(alg=self._get_cose_alg())
@@ -622,17 +612,15 @@
return msg.payload, msg.protected_header
- def _get_cose_payload(self, cose, *, check_p_header, verify_signature):
+ def _get_cose_payload(self, cose, *, verify_signature):
"""Return the payload of a COSE envelope"""
if self._get_method() == AttestationTokenVerifier.SIGN_METHOD_SIGN1:
return self._get_cose_sign1_payload(
cose,
- check_p_header=check_p_header,
verify_signature=verify_signature)
if self._get_method() == AttestationTokenVerifier.SIGN_METHOD_MAC0:
return self._get_cose_mac0_payload(
cose,
- check_p_header=check_p_header,
verify_signature=verify_signature)
err_msg = f'Unexpected method "{self._get_method()}"; must be one of: sign, mac'
raise ValueError(err_msg)
@@ -641,8 +629,7 @@
self,
token_encoder,
token_map,
- *, add_p_header,
- name_as_key,
+ *, name_as_key,
parse_raw_value,
root=False):
with BytesIO() as b_io:
@@ -662,14 +649,13 @@
self.claims.convert_map_to_token(
encoder,
token_map,
- add_p_header=add_p_header,
name_as_key=name_as_key,
parse_raw_value=parse_raw_value)
token = b_io.getvalue()
# Sign and pack in a COSE envelope if necessary
- signed_token = self._sign_token(token, add_p_header=add_p_header)
+ signed_token = self._sign_token(token)
# Pack as a bstr if necessary
if root:
@@ -677,7 +663,7 @@
else:
token_encoder.encode_bytestring(signed_token)
- def parse_token(self, *, token, check_p_header, lower_case_key):
+ def parse_token(self, *, token, lower_case_key):
if self._get_method() == AttestationTokenVerifier.SIGN_METHOD_RAW:
payload = token
protected_header = None
@@ -685,7 +671,6 @@
try:
payload, protected_header = self._get_cose_payload(
token,
- check_p_header=check_p_header,
# signature verification is done in the verify function
verify_signature=False)
except Exception as exc:
@@ -706,13 +691,11 @@
token_items = self.claims.parse_token(
token=raw_map,
- check_p_header=check_p_header,
lower_case_key=lower_case_key)
ret = TokenItem(value=token_items, claim_type=self)
ret.wrapping_tag = raw_map_tag
ret.token = token
- ret.check_p_header = check_p_header
ret.protected_header = protected_header
return ret
@@ -721,7 +704,6 @@
try:
self._get_cose_payload(
token_item.token,
- check_p_header=token_item.check_p_header,
verify_signature=(self._get_signing_key() is not None))
except Exception as exc:
msg = f'Bad COSE: {exc}'
diff --git a/iat-verifier/iatverifier/cca_token_verifier.py b/iat-verifier/iatverifier/cca_token_verifier.py
index ff0df7b..df9ddab 100644
--- a/iat-verifier/iatverifier/cca_token_verifier.py
+++ b/iat-verifier/iatverifier/cca_token_verifier.py
@@ -181,7 +181,6 @@
try:
token_item.claim_type._get_cose_payload(
token_item.token,
- check_p_header=False, # already done in the parent's verify
verify_signature=True)
except ValueError:
self.error("Realm signature doesn't match Realm Public Key claim in Realm token.")
diff --git a/iat-verifier/iatverifier/util.py b/iat-verifier/iatverifier/util.py
index bc78aad..9350979 100644
--- a/iat-verifier/iatverifier/util.py
+++ b/iat-verifier/iatverifier/util.py
@@ -25,7 +25,7 @@
"NIST521p": AttestationTokenVerifier.COSE_ALG_ES512,
}
-def convert_map_to_token(token_map, verifier, wfh, *, add_p_header, name_as_key, parse_raw_value):
+def convert_map_to_token(token_map, verifier, wfh, *, name_as_key, parse_raw_value):
"""
Convert a map to token and write the result to a file.
"""
@@ -33,7 +33,6 @@
verifier.convert_map_to_token(
encoder,
token_map,
- add_p_header=add_p_header,
name_as_key=name_as_key,
parse_raw_value=parse_raw_value,
root=True)
diff --git a/iat-verifier/scripts/check_iat b/iat-verifier/scripts/check_iat
index ccea2a4..6994742 100755
--- a/iat-verifier/scripts/check_iat
+++ b/iat-verifier/scripts/check_iat
@@ -57,10 +57,6 @@
help='''
Report failure if unknown claim is encountered.
''')
- parser.add_argument('-c', '--check-protected-header', action='store_true',
- help='''
- Check the presence and content of COSE protected header.
- ''')
parser.add_argument('-m', '--method', choices=['sign', 'mac'], default='sign',
help='''
Specify how this token is wrapped -- whether Sign1Message or
@@ -143,7 +139,6 @@
with open(args.tokenfile, 'rb') as token_file:
token = verifier.parse_token(
token=token_file.read(),
- check_p_header=args.check_protected_header,
lower_case_key=False)
token.verify()
if key_checked:
diff --git a/iat-verifier/scripts/compile_token b/iat-verifier/scripts/compile_token
index ef49961..e060529 100755
--- a/iat-verifier/scripts/compile_token
+++ b/iat-verifier/scripts/compile_token
@@ -48,10 +48,6 @@
sign the CCA Realm token. If this is not specified, the
token will be unsigned.''')
group = parser.add_mutually_exclusive_group()
- parser.add_argument('-a', '--add-protected-header', action='store_true',
- help='''
- Add protected header to the COSE wrapper.
- ''')
group.add_argument('-r', '--raw', action='store_true',
help='''Generate raw CBOR and do not create a signature
or COSE wrapper.''')
@@ -145,7 +141,6 @@
token_map,
verifier,
wfh,
- add_p_header=args.add_protected_header,
name_as_key=True,
parse_raw_value=True)
else:
@@ -154,6 +149,5 @@
token_map,
verifier,
wfh,
- add_p_header=args.add_protected_header,
name_as_key=True,
parse_raw_value=True)
diff --git a/iat-verifier/scripts/decompile_token b/iat-verifier/scripts/decompile_token
index c81f330..ae75053 100755
--- a/iat-verifier/scripts/decompile_token
+++ b/iat-verifier/scripts/decompile_token
@@ -79,7 +79,6 @@
with open(args.source, 'rb') as fh:
token_map = verifier.parse_token(
token=fh.read(),
- check_p_header=False,
lower_case_key=True).get_token_map()
if args.outfile:
diff --git a/iat-verifier/tests/data/cca_platform_token.cbor b/iat-verifier/tests/data/cca_platform_token.cbor
index 8d97a0c..a5f7a97 100644
--- a/iat-verifier/tests/data/cca_platform_token.cbor
+++ b/iat-verifier/tests/data/cca_platform_token.cbor
Binary files differ
diff --git a/iat-verifier/tests/data/iat-hmac.cbor b/iat-verifier/tests/data/iat-hmac.cbor
index 1ea3018..eb163b0 100644
--- a/iat-verifier/tests/data/iat-hmac.cbor
+++ b/iat-verifier/tests/data/iat-hmac.cbor
Binary files differ
diff --git a/iat-verifier/tests/data/iat.cbor b/iat-verifier/tests/data/iat.cbor
index 38d2ec4..9809c3e 100644
--- a/iat-verifier/tests/data/iat.cbor
+++ b/iat-verifier/tests/data/iat.cbor
Binary files differ
diff --git a/iat-verifier/tests/synthetic_data/unknown_claims.cbor b/iat-verifier/tests/synthetic_data/unknown_claims.cbor
index 1c9507d..58987a2 100644
--- a/iat-verifier/tests/synthetic_data/unknown_claims.cbor
+++ b/iat-verifier/tests/synthetic_data/unknown_claims.cbor
Binary files differ
diff --git a/iat-verifier/tests/test_synthetic.py b/iat-verifier/tests/test_synthetic.py
index 413703d..796e845 100644
--- a/iat-verifier/tests/test_synthetic.py
+++ b/iat-verifier/tests/test_synthetic.py
@@ -16,7 +16,8 @@
from iatverifier.util import read_token_map, read_keyfile
from iatverifier.attest_token_verifier import VerifierConfiguration, AttestationTokenVerifier
from tests.synthetic_token_verifier import SyntheticTokenVerifier2, SyntheticTokenVerifier
-from test_utils import read_iat, create_and_read_iat, convert_map_to_token_bytes, bytes_equal_to_file
+from tests.test_utils import read_iat, create_and_read_iat
+from tests.test_utils import convert_map_to_token_bytes, bytes_equal_to_file
THIS_DIR = os.path.dirname(__file__)
@@ -99,13 +100,10 @@
configuration=self.config,
internal_signing_key=signing_key)
- token_p_header = convert_map_to_token_bytes(token_map, verifier, add_p_header=True)
- token_no_p_header = convert_map_to_token_bytes(token_map, verifier, add_p_header=False)
+ token_p_header = convert_map_to_token_bytes(token_map, verifier)
self.assertTrue(
bytes_equal_to_file(token_p_header, os.path.join(DATA_DIR, 'p_header_on.cbor')))
- self.assertTrue(
- bytes_equal_to_file(token_no_p_header, os.path.join(DATA_DIR, 'p_header_off.cbor')))
with self.assertLogs() as test_ctx:
read_iat(
@@ -115,8 +113,7 @@
cose_alg=cose_alg,
signing_key=signing_key,
configuration=config,
- internal_signing_key=signing_key),
- check_p_header=True)
+ internal_signing_key=signing_key))
self.assertEquals(2, len(test_ctx.output))
self.assertIn('Unexpected protected header', test_ctx.output[0])
self.assertIn('Missing alg from protected header (expected ES256)', test_ctx.output[1])
@@ -129,8 +126,7 @@
cose_alg=cose_alg,
signing_key=signing_key,
configuration=config,
- internal_signing_key=signing_key),
- check_p_header=True)
+ internal_signing_key=signing_key))
self.assertEquals(2, len(test_ctx.output))
self.assertIn('Missing alg from protected header (expected ES256)', test_ctx.output[0])
self.assertIn('Unexpected protected header', test_ctx.output[1])
@@ -152,9 +148,10 @@
signing_key=signing_key,
configuration=config,
internal_signing_key=signing_key))
- self.assertEquals(2, len(test_ctx.output))
+ self.assertEquals(3, len(test_ctx.output))
self.assertIn('Unexpected tag (0xcdcd) in token SYNTHETIC_TOKEN', test_ctx.output[0])
- self.assertIn('Unexpected tag (0xabab) in token SYNTHETIC_INTERNAL_TOKEN', test_ctx.output[1])
+ self.assertIn('Invalid Protected header: Missing alg from protected header (expected ES256)', test_ctx.output[1])
+ self.assertIn('Unexpected tag (0xabab) in token SYNTHETIC_INTERNAL_TOKEN', test_ctx.output[2])
# test with missing tag
with self.assertLogs() as test_ctx:
diff --git a/iat-verifier/tests/test_utils.py b/iat-verifier/tests/test_utils.py
index e08d3a2..c769644 100644
--- a/iat-verifier/tests/test_utils.py
+++ b/iat-verifier/tests/test_utils.py
@@ -26,33 +26,31 @@
return False
return True
-def convert_map_to_token_bytes(token_map, verifier, add_p_header):
+def convert_map_to_token_bytes(token_map, verifier):
"""Converts a map to cbor token"""
with BytesIO() as bytes_io:
encoder = CBOREncoder(bytes_io)
verifier.convert_map_to_token(
encoder,
token_map,
- add_p_header=add_p_header,
name_as_key=True,
parse_raw_value=True,
root=True)
return bytes_io.getvalue()
-def create_token(data_dir, source_name, verifier, add_p_header):
+def create_token(data_dir, source_name, verifier):
"""Creats a cbor token from a yaml file."""
source_path = os.path.join(data_dir, source_name)
token_map = read_token_map(source_path)
- return convert_map_to_token_bytes(token_map, verifier, add_p_header)
+ return convert_map_to_token_bytes(token_map, verifier)
-def create_token_file(data_dir, source_name, verifier, dest_path, *, add_p_header=True):
+def create_token_file(data_dir, source_name, verifier, dest_path):
"""Create a cbor token from a yaml file and write it to a file
"""
token = create_token(
data_dir=data_dir,
source_name=source_name,
- verifier=verifier,
- add_p_header=add_p_header)
+ verifier=verifier)
with open(dest_path, 'wb') as wfh:
wfh.write(token)
@@ -67,13 +65,12 @@
return dest_path
-def read_iat(data_dir, filename, verifier, *, check_p_header=False):
+def read_iat(data_dir, filename, verifier):
"""Read a cbor file and returns the parsed dictionary"""
filepath = os.path.join(data_dir, filename)
with open(filepath, 'rb') as file:
token_item = verifier.parse_token(
token=file.read(),
- check_p_header=check_p_header,
lower_case_key=False)
token_item.verify()
token_item.get_token_map()
diff --git a/iat-verifier/tests/test_verifier.py b/iat-verifier/tests/test_verifier.py
index 2ea1b5e..8f64be8 100644
--- a/iat-verifier/tests/test_verifier.py
+++ b/iat-verifier/tests/test_verifier.py
@@ -61,7 +61,6 @@
with open(good_sig, 'rb') as wfh:
token_item = verifier_good_sig.parse_token(
token=wfh.read(),
- check_p_header=False,
lower_case_key=False)
token_item.verify()
@@ -69,7 +68,6 @@
with open(bad_sig, 'rb') as wfh:
token_item = verifier_good_sig.parse_token(
token=wfh.read(),
- check_p_header=False,
lower_case_key=False)
token_item.verify()