Skip to content
Toggle navigation
P
Projects
G
Groups
S
Snippets
Help
lwc
/
compare
This project
Loading...
Sign in
Toggle navigation
Go to a project
Project
Repository
Pipelines
Members
Activity
Graph
Charts
Create a new issue
Commits
Issue Boards
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Commit
4fa05cc7
authored
Feb 21, 2020
by
Sebastian Renner
Browse files
Options
Browse Files
Download
Plain Diff
Mergerino
parents
12fe513c
3a376339
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
864 additions
and
1120 deletions
+864
-1120
compile_all.py
+10
-11
templates/bluepill/openocd.cfg
+2
-2
templates/bluepill/test
+109
-180
templates/esp32/test
+83
-170
templates/f7/test
+78
-225
templates/maixduino/test
+88
-174
templates/uno/test
+80
-170
test.py
+0
-188
test_common.py
+414
-0
No files found.
compile_all.py
View file @
4fa05cc7
...
@@ -5,7 +5,6 @@ import sys
...
@@ -5,7 +5,6 @@ import sys
import
stat
import
stat
import
argparse
import
argparse
import
shutil
import
shutil
import
random
import
subprocess
import
subprocess
import
shutil
import
shutil
...
@@ -15,7 +14,7 @@ def build(algo_dir, template_dir, build_dir):
...
@@ -15,7 +14,7 @@ def build(algo_dir, template_dir, build_dir):
return
None
return
None
print
(
"Building in
%
s"
%
build_dir
)
print
(
"Building in
%
s"
%
build_dir
)
# copy all the files from the submitted algorithm into the build directory
# copy all the files from the submitted algorithm into the build directory
shutil
.
copytree
(
algo_dir
,
build_dir
)
shutil
.
copytree
(
algo_dir
,
build_dir
)
...
@@ -68,7 +67,6 @@ def build(algo_dir, template_dir, build_dir):
...
@@ -68,7 +67,6 @@ def build(algo_dir, template_dir, build_dir):
p
.
wait
()
p
.
wait
()
assert
p
.
returncode
==
0
assert
p
.
returncode
==
0
finally
:
finally
:
sys
.
stdout
.
flush
()
sys
.
stdout
.
flush
()
sys
.
stderr
.
flush
()
sys
.
stderr
.
flush
()
...
@@ -122,7 +120,8 @@ def main(argv):
...
@@ -122,7 +120,8 @@ def main(argv):
# get all the submissions by looking for files named "api.h"
# get all the submissions by looking for files named "api.h"
subfiles
=
[]
subfiles
=
[]
for
submission
in
subs
:
for
submission
in
subs
:
implementations_dir
=
os
.
path
.
join
(
submissions_dir
,
submission
,
"Implementations"
,
"crypto_aead"
)
implementations_dir
=
os
.
path
.
join
(
submissions_dir
,
submission
,
"Implementations"
,
"crypto_aead"
)
if
not
os
.
path
.
isdir
(
implementations_dir
):
if
not
os
.
path
.
isdir
(
implementations_dir
):
continue
continue
...
@@ -164,19 +163,17 @@ def main(argv):
...
@@ -164,19 +163,17 @@ def main(argv):
pieces
=
f
.
split
(
os
.
sep
)
pieces
=
f
.
split
(
os
.
sep
)
n
=
pieces
[
1
]
+
"."
+
"."
.
join
(
pieces
[
4
:
-
1
])
n
=
pieces
[
1
]
+
"."
+
"."
.
join
(
pieces
[
4
:
-
1
])
print
(
n
)
print
(
n
)
# if include_list was provided, skip elements not in the list
# if include_list was provided, skip elements not in the list
if
include_list
is
not
None
:
if
include_list
is
not
None
:
if
n
ot
n
in
include_list
:
if
n
not
in
include_list
:
continue
continue
# Put all in a tuple and count
# Put all in a tuple and count
files
.
append
((
t
,
d
,
n
))
files
.
append
((
t
,
d
,
n
))
# For testing, we only do the first 1
# For testing, we only do the first 1
#files = files[:1]
#
files = files[:1]
print
(
"
%
d algorithms will be compiled"
%
len
(
files
))
print
(
"
%
d algorithms will be compiled"
%
len
(
files
))
if
not
os
.
path
.
isdir
(
build_root_dir
):
if
not
os
.
path
.
isdir
(
build_root_dir
):
...
@@ -198,8 +195,10 @@ def main(argv):
...
@@ -198,8 +195,10 @@ def main(argv):
b
=
build
(
d
,
template_dir
,
build_dir
)
b
=
build
(
d
,
template_dir
,
build_dir
)
if
b
is
None
:
if
b
is
None
:
continue
continue
test_script
.
write
(
"
\n\n
echo
\"
TEST NUMBER
%03
d: TESTING
%
s
\"\n
"
%
(
i
,
d
))
test_script
.
write
(
test_script
.
write
(
"python3 -u ./test.py
%
s
%
s 2>
%
s | tee
%
s
\n
"
%
(
"
\n\n
echo
\"
TEST NUMBER
%03
d: TESTING
%
s
\"\n
"
%
(
i
,
d
))
test_script
.
write
(
"python3 -u ./test.py
%
s
%
s 2>
%
s | tee
%
s
\n
"
%
(
t
,
t
,
os
.
path
.
join
(
b
,
'test'
),
os
.
path
.
join
(
b
,
'test'
),
os
.
path
.
join
(
b
,
'test_stderr.log'
),
os
.
path
.
join
(
b
,
'test_stderr.log'
),
...
...
templates/bluepill/openocd.cfg
View file @
4fa05cc7
...
@@ -21,6 +21,6 @@ source [find target/stm32f1x.cfg]
...
@@ -21,6 +21,6 @@ source [find target/stm32f1x.cfg]
#tpiu config internal swodump.stm32f103-generic.log uart off 72000000
#tpiu config internal swodump.stm32f103-generic.log uart off 72000000
#
reset_config srst_only srst_push_pull srst_nogate connect_assert_srst
reset_config srst_only srst_push_pull srst_nogate connect_assert_srst
reset_config none srst_push_pull srst_nogate
#
reset_config none srst_push_pull srst_nogate
templates/bluepill/test
View file @
4fa05cc7
...
@@ -2,197 +2,126 @@
...
@@ -2,197 +2,126 @@
import
os
import
os
import
sys
import
sys
import
time
import
struct
import
serial
import
subprocess
import
subprocess
import
serial.tools.list_ports
from
test_common
import
(
LogicMultiplexerTimeMeasurements
,
parse_nist_aead_test_vectors
,
DeviceUnderTestAeadUARTP
,
compare_dumps
,
eprint
,
run_nist_aead_test_line
,
)
def
get_serial
():
ports
=
serial
.
tools
.
list_ports
.
comports
()
devices
=
[
p
.
device
for
p
in
ports
if
p
.
serial_number
==
'FT2XCRZ1'
]
devices
.
sort
()
return
serial
.
Serial
(
devices
[
0
],
baudrate
=
115200
,
timeout
=
5
)
class
BluePill
(
DeviceUnderTestAeadUARTP
):
RAM_SIZE
=
0x5000
def
__init__
(
self
,
build_dir
):
DeviceUnderTestAeadUARTP
.
__init__
(
self
,
get_serial
())
self
.
firmware_path
=
os
.
path
.
join
(
build_dir
,
'.pio/build/bluepill_f103c8/firmware.elf'
)
self
.
ram_pattern_path
=
os
.
path
.
join
(
build_dir
,
'empty_ram.bin'
)
self
.
ram_dump_path
=
os
.
path
.
join
(
build_dir
,
'ram_dump.bin'
)
self
.
openocd_cfg_path
=
os
.
path
.
join
(
build_dir
,
'openocd.cfg'
)
def
flash
(
self
):
# pipe = subprocess.PIPE
cmd
=
[
'openocd'
,
'-f'
,
'openocd.cfg'
,
'-c'
,
'program
%
s verify reset exit'
%
self
.
firmware_path
]
p
=
subprocess
.
Popen
(
cmd
,
stdout
=
sys
.
stderr
,
stdin
=
sys
.
stdout
)
stdout
,
stderr
=
p
.
communicate
(
""
)
eprint
(
"Firmware flashed."
)
cmd
=
[
'openocd'
,
'-f'
,
self
.
openocd_cfg_path
,
'-c'
,
'program
%
s reset exit 0x20000000'
%
self
.
ram_pattern_path
]
p
=
subprocess
.
Popen
(
cmd
,
stdout
=
sys
.
stderr
,
stdin
=
sys
.
stdout
)
stdout
,
stderr
=
p
.
communicate
(
""
)
eprint
(
"RAM flashed."
)
def
dump_ram
(
self
):
cmd
=
[
'openocd'
,
'-f'
,
self
.
openocd_cfg_path
,
'-c'
,
'init'
,
'-c'
,
'halt'
,
'-c'
,
'dump_image
%
s 0x20000000 0x
%
x'
%
(
self
.
ram_dump_path
,
BluePill
.
RAM_SIZE
),
'-c'
,
'resume'
,
'-c'
,
'exit'
]
p
=
subprocess
.
Popen
(
cmd
,
stdout
=
sys
.
stderr
,
stdin
=
sys
.
stdout
)
stdout
,
stderr
=
p
.
communicate
(
""
)
eprint
(
"RAM dumped."
)
with
open
(
self
.
ram_dump_path
,
'rb'
)
as
ram
:
ram
=
ram
.
read
()
if
len
(
ram
)
!=
BluePill
.
RAM_SIZE
:
raise
Exception
(
"RAM dump was
%
d bytes instead of
%
d"
%
(
len
(
ram
),
BluePill
.
RAM_SIZE
))
return
ram
def
eprint
(
*
args
,
**
kargs
):
print
(
*
args
,
file
=
sys
.
stderr
,
**
kargs
)
def
main
(
argv
):
if
len
(
argv
)
!=
3
:
print
(
"Usage: test LWC_AEAD_KAT.txt build_dir"
)
return
1
def
flash
():
kat
=
list
(
parse_nist_aead_test_vectors
(
argv
[
1
]))
pipe
=
subprocess
.
PIPE
build_dir
=
argv
[
2
]
cmd
=
[
'openocd'
,
'-f'
,
'openocd.cfg'
,
'-c'
'program '
+
'.pio/build/bluepill_f103c8/firmware.elf verify reset exit'
]
p
=
subprocess
.
Popen
(
cmd
,
stdout
=
sys
.
stderr
,
stdin
=
sys
.
stdout
)
stdout
,
stderr
=
p
.
communicate
(
""
)
dut
=
BluePill
(
build_dir
)
def
fill_ram
():
try
:
pipe
=
subprocess
.
PIPE
tool
=
LogicMultiplexerTimeMeasurements
(
0x0003
)
cmd
=
[
'openocd'
,
'-f'
,
'openocd.cfg'
,
'-c'
'program '
+
tool
.
begin_measurement
()
'empty_ram.bin reset exit 0x20000000'
]
p
=
subprocess
.
Popen
(
cmd
,
stdout
=
sys
.
stderr
,
stdin
=
sys
.
stdout
)
stdout
,
stderr
=
p
.
communicate
(
""
)
dut
.
flash
()
dut
.
prepare
()
sys
.
stdout
.
write
(
"Board prepared
\n
"
)
sys
.
stdout
.
flush
()
def
get_serial
():
dump_a
=
dut
.
dump_ram
()
import
serial.tools.list_ports
ports
=
serial
.
tools
.
list_ports
.
comports
()
devices
=
[
p
.
device
for
p
in
ports
]
devices
.
sort
()
return
devices
[
-
1
]
class
UARTP
:
def
__init__
(
self
,
ser
):
UARTP
.
SYN
=
0xf9
UARTP
.
FIN
=
0xf3
self
.
ser
=
ser
def
uart_read
(
self
):
r
=
self
.
ser
.
read
(
1
)
if
len
(
r
)
!=
1
:
raise
Exception
(
"Serial read error"
)
return
r
[
0
]
def
uart_write
(
self
,
c
):
b
=
struct
.
pack
(
"B"
,
c
)
r
=
self
.
ser
.
write
(
b
)
if
r
!=
len
(
b
):
raise
Exception
(
"Serial write error"
)
return
r
def
send
(
self
,
buf
):
self
.
uart_write
(
UARTP
.
SYN
)
len_ind_0
=
0xff
&
len
(
buf
)
len_ind_1
=
0xff
&
(
len
(
buf
)
>>
7
)
if
len
(
buf
)
<
128
:
self
.
uart_write
(
len_ind_0
)
else
:
self
.
uart_write
(
len_ind_0
|
0x80
)
self
.
uart_write
(
len_ind_1
)
fcs
=
0
for
i
in
range
(
len
(
buf
)):
info
=
buf
[
i
]
fcs
=
(
fcs
+
info
)
&
0xff
self
.
uart_write
(
buf
[
i
])
fcs
=
(
0xff
-
fcs
)
&
0xff
self
.
uart_write
(
fcs
)
self
.
uart_write
(
UARTP
.
FIN
)
eprint
(
"sent frame '
%
s'"
%
buf
.
hex
())
def
recv
(
self
):
tag_old
=
UARTP
.
FIN
while
1
:
tag
=
tag_old
while
1
:
if
tag_old
==
UARTP
.
FIN
:
if
tag
==
UARTP
.
SYN
:
break
tag_old
=
tag
tag
=
self
.
uart_read
()
tag_old
=
tag
l
=
self
.
uart_read
()
if
l
&
0x80
:
l
&=
0x7f
l
|=
self
.
uart_read
()
<<
7
fcs
=
0
buf
=
[]
for
i
in
range
(
l
):
info
=
self
.
uart_read
()
buf
.
append
(
info
)
fcs
=
(
fcs
+
info
)
&
0xff
fcs
=
(
fcs
+
self
.
uart_read
())
&
0xff
tag
=
self
.
uart_read
()
if
fcs
==
0xff
:
if
tag
==
UARTP
.
FIN
:
buf
=
bytes
(
buf
)
eprint
(
"rcvd frame '
%
s'"
%
buf
.
hex
())
if
len
(
buf
)
>=
1
and
buf
[
0
]
==
0xde
:
sys
.
stderr
.
buffer
.
write
(
buf
[
1
:])
sys
.
stderr
.
flush
()
else
:
return
buf
def
main
(
argv
):
for
i
,
m
,
ad
,
k
,
npub
,
c
in
kat
:
eprint
(
argv
[
0
])
tool
.
arm
()
script_dir
=
os
.
path
.
split
(
argv
[
0
])[
0
]
run_nist_aead_test_line
(
dut
,
i
,
m
,
ad
,
k
,
npub
,
c
)
if
len
(
script_dir
)
>
0
:
tool
.
unarm
()
os
.
chdir
(
script_dir
)
if
i
==
1
:
dev
=
get_serial
()
dump_b
=
dut
.
dump_ram
()
ser
=
serial
.
Serial
(
dev
,
baudrate
=
115200
,
timeout
=
5
)
longest
=
compare_dumps
(
dump_a
,
dump_b
)
uartp
=
UARTP
(
ser
)
print
(
" longest chunk of untouched memory =
%
d"
%
longest
)
flash
()
except
Exception
as
ex
:
fill_ram
()
print
(
"TEST FAILED"
)
eprint
(
"Flashed"
)
raise
ex
time
.
sleep
(
0.1
)
finally
:
ser
.
setDTR
(
False
)
# IO0=HIGH
tool
.
end_measurement
()
ser
.
setRTS
(
True
)
# EN=LOW, chip in reset
sys
.
stdout
.
flush
()
time
.
sleep
(
0.1
)
sys
.
stderr
.
flush
()
ser
.
setDTR
(
False
)
# IO0=HIGH
ser
.
setRTS
(
False
)
# EN=HIGH, chip out of reset
time
.
sleep
(
1
)
def
stdin_read
(
n
):
b
=
sys
.
stdin
.
buffer
.
read
(
n
)
if
len
(
b
)
!=
n
:
sys
.
exit
(
1
)
return
b
def
stdin_readvar
():
l
=
stdin_read
(
4
)
(
l
,
)
=
struct
.
unpack
(
"<I"
,
l
)
v
=
stdin_read
(
l
)
return
v
exp_hello
=
b
"Hello, World!"
hello
=
ser
.
read
(
ser
.
in_waiting
)
if
hello
[
-
13
:]
!=
exp_hello
:
eprint
(
"Improper board initialization message:
%
s"
%
hello
)
return
1
eprint
(
"Board initialized properly"
)
sys
.
stdout
.
write
(
"Hello, World!
\n
"
)
sys
.
stdout
.
flush
()
while
1
:
action
=
stdin_read
(
1
)[
0
]
eprint
(
"Command
%
c from stdin"
%
action
)
if
action
in
b
"ackmps"
:
v
=
stdin_readvar
()
uartp
.
send
(
struct
.
pack
(
"B"
,
action
)
+
v
)
ack
=
uartp
.
recv
()
if
len
(
ack
)
!=
1
or
ack
[
0
]
!=
action
:
raise
Exception
(
"Unacknowledged variable transfer"
)
eprint
(
"Var
%
c successfully sent to board"
%
action
)
elif
action
in
b
"ACKMPS"
:
c
=
struct
.
pack
(
"B"
,
action
)
uartp
.
send
(
c
)
v
=
uartp
.
recv
()
if
len
(
v
)
<
1
or
v
[
0
]
!=
action
:
raise
Exception
(
"Could not obtain variable from board"
)
v
=
v
[
1
:]
eprint
(
"Var
%
c received from board:
%
s"
%
(
action
,
v
.
hex
()))
l
=
struct
.
pack
(
"<I"
,
len
(
v
))
sys
.
stdout
.
buffer
.
write
(
l
+
v
)
sys
.
stdout
.
flush
()
elif
action
in
b
"ed"
:
c
=
struct
.
pack
(
"B"
,
action
)
uartp
.
send
(
c
)
ack
=
uartp
.
recv
()
if
len
(
ack
)
!=
1
or
ack
[
0
]
!=
action
:
raise
Exception
(
"Unacknowledged variable transfer"
)
eprint
(
"Operation
%
c completed successfully"
%
action
)
else
:
raise
Exception
(
"Unknown action
%
c"
%
action
)
return
0
if
__name__
==
"__main__"
:
if
__name__
==
"__main__"
:
...
...
templates/esp32/test
View file @
4fa05cc7
...
@@ -3,186 +3,99 @@
...
@@ -3,186 +3,99 @@
import
os
import
os
import
sys
import
sys
import
time
import
time
import
struct
import
serial
import
subprocess
import
subprocess
import
serial.tools.list_ports
from
test_common
import
(
LogicMultiplexerTimeMeasurements
,
parse_nist_aead_test_vectors
,
DeviceUnderTestAeadUARTP
,
eprint
,
run_nist_aead_test_line
,
)
def
get_serial
():
ports
=
serial
.
tools
.
list_ports
.
comports
()
devices
=
[
p
.
device
for
p
in
ports
if
(
p
.
vid
==
4292
and
p
.
pid
==
60000
)
]
devices
.
sort
()
return
devices
[
0
]
def
eprint
(
*
args
,
**
kargs
):
print
(
*
args
,
file
=
sys
.
stderr
,
**
kargs
)
class
ESP32
(
DeviceUnderTestAeadUARTP
):
def
flash
(
tty
=
None
):
def
__init__
(
self
,
build_dir
):
pipe
=
subprocess
.
PIPE
DeviceUnderTestAeadUARTP
.
__init__
(
self
)
cmd
=
[
'platformio'
,
'run'
,
'-e'
,
'esp32dev'
,
'--target'
,
'upload'
]
if
tty
is
not
None
:
self
.
build_dir
=
build_dir
cmd
.
extend
([
'--upload-port'
,
tty
])
p
=
subprocess
.
Popen
(
cmd
,
def
flash
(
self
):
stdout
=
sys
.
stderr
,
stdin
=
pipe
)
pipe
=
subprocess
.
PIPE
stdout
,
stderr
=
p
.
communicate
(
""
)
previous_dir
=
os
.
path
.
abspath
(
os
.
curdir
)
os
.
chdir
(
self
.
build_dir
)
cmd
=
[
'platformio'
,
'run'
,
'-e'
,
'esp32dev'
,
'--target'
,
'upload'
]
cmd
.
extend
([
'--upload-port'
,
get_serial
()])
cmd
.
extend
([
'--upload-port'
,
get_serial
()])
p
=
subprocess
.
Popen
(
cmd
,
stdout
=
sys
.
stderr
,
stdin
=
pipe
)
stdout
,
stderr
=
p
.
communicate
(
""
)
eprint
(
"Firmware flashed."
)
os
.
chdir
(
previous_dir
)
def
dump_ram
(
self
):
return
None
def
get_serial
():
import
serial.tools.list_ports
ports
=
serial
.
tools
.
list_ports
.
comports
()
devices
=
[
p
.
device
for
p
in
ports
]
devices
.
sort
()
return
devices
[
-
1
]
class
UARTP
:
def
__init__
(
self
,
ser
):
UARTP
.
SYN
=
0xf9
UARTP
.
FIN
=
0xf3
self
.
ser
=
ser
def
uart_read
(
self
):
r
=
self
.
ser
.
read
(
1
)
if
len
(
r
)
!=
1
:
raise
Exception
(
"Serial read error"
)
return
r
[
0
]
def
uart_write
(
self
,
c
):
b
=
struct
.
pack
(
"B"
,
c
)
r
=
self
.
ser
.
write
(
b
)
if
r
!=
len
(
b
):
raise
Exception
(
"Serial write error"
)
return
r
def
send
(
self
,
buf
):
self
.
uart_write
(
UARTP
.
SYN
)
len_ind_0
=
0xff
&
len
(
buf
)
len_ind_1
=
0xff
&
(
len
(
buf
)
>>
7
)
if
len
(
buf
)
<
128
:
self
.
uart_write
(
len_ind_0
)
else
:
self
.
uart_write
(
len_ind_0
|
0x80
)
self
.
uart_write
(
len_ind_1
)
fcs
=
0
for
i
in
range
(
len
(
buf
)):
info
=
buf
[
i
]
fcs
=
(
fcs
+
info
)
&
0xff
self
.
uart_write
(
buf
[
i
])
fcs
=
(
0xff
-
fcs
)
&
0xff
self
.
uart_write
(
fcs
)
self
.
uart_write
(
UARTP
.
FIN
)
eprint
(
"sent frame '
%
s'"
%
buf
.
hex
())
def
recv
(
self
):
tag_old
=
UARTP
.
FIN
while
1
:
tag
=
tag_old
while
1
:
if
tag_old
==
UARTP
.
FIN
:
if
tag
==
UARTP
.
SYN
:
break
tag_old
=
tag
tag
=
self
.
uart_read
()
tag_old
=
tag
l
=
self
.
uart_read
()
if
l
&
0x80
:
l
&=
0x7f
l
|=
self
.
uart_read
()
<<
7
fcs
=
0
buf
=
[]
for
i
in
range
(
l
):
info
=
self
.
uart_read
()
buf
.
append
(
info
)
fcs
=
(
fcs
+
info
)
&
0xff
fcs
=
(
fcs
+
self
.
uart_read
())
&
0xff
tag
=
self
.
uart_read
()
if
fcs
==
0xff
:
if
tag
==
UARTP
.
FIN
:
buf
=
bytes
(
buf
)
eprint
(
"rcvd frame '
%
s'"
%
buf
.
hex
())
if
len
(
buf
)
>=
1
and
buf
[
0
]
==
0xde
:
sys
.
stderr
.
buffer
.
write
(
buf
[
1
:])
sys
.
stderr
.
flush
()
else
:
return
buf
def
main
(
argv
):
def
main
(
argv
):
eprint
(
argv
[
0
])
if
len
(
argv
)
!=
3
:
script_dir
=
os
.
path
.
split
(
argv
[
0
])[
0
]
print
(
"Usage: test LWC_AEAD_KAT.txt build_dir"
)
if
len
(
script_dir
)
>
0
:
os
.
chdir
(
script_dir
)
dev
=
get_serial
()
flash
(
dev
)
eprint
(
"Flashed"
)
time
.
sleep
(
0.1
)
ser
=
serial
.
Serial
(
dev
,
baudrate
=
500000
,
timeout
=
5
)
uartp
=
UARTP
(
ser
)
ser
.
setDTR
(
False
)
# IO0=HIGH
ser
.
setRTS
(
True
)
# EN=LOW, chip in reset
time
.
sleep
(
0.1
)
ser
.
setDTR
(
False
)
# IO0=HIGH
ser
.
setRTS
(
False
)
# EN=HIGH, chip out of reset
time
.
sleep
(
1
)
def
stdin_read
(
n
):
b
=
sys
.
stdin
.
buffer
.
read
(
n
)
if
len
(
b
)
!=
n
:
sys
.
exit
(
1
)
return
b
def
stdin_readvar
():
l
=
stdin_read
(
4
)
(
l
,
)
=
struct
.
unpack
(
"<I"
,
l
)
v
=
stdin_read
(
l
)
return
v
exp_hello
=
b
"Hello, World!"
hello
=
ser
.
read
(
ser
.
in_waiting
)
if
hello
[
-
13
:]
!=
exp_hello
:
eprint
(
"Improper board initialization message:
%
s"
%
hello
)
return
1
return
1
eprint
(
"Board initialized properly"
)
sys
.
stdout
.
write
(
"Hello, World!
\n
"
)
kat
=
list
(
parse_nist_aead_test_vectors
(
argv
[
1
]))
sys
.
stdout
.
flush
()
build_dir
=
argv
[
2
]
while
1
:
dut
=
ESP32
(
build_dir
)
action
=
stdin_read
(
1
)[
0
]
eprint
(
"Command
%
c from stdin"
%
action
)
try
:
tool
=
LogicMultiplexerTimeMeasurements
(
0x0030
)
if
action
in
b
"ackmps"
:
tool
.
begin_measurement
()
v
=
stdin_readvar
()
uartp
.
send
(
struct
.
pack
(
"B"
,
action
)
+
v
)
dut
.
flash
()
ack
=
uartp
.
recv
()
if
len
(
ack
)
!=
1
or
ack
[
0
]
!=
action
:
ser
=
serial
.
Serial
(
raise
Exception
(
"Unacknowledged variable transfer"
)
get_serial
(),
eprint
(
"Var
%
c successfully sent to board"
%
action
)
baudrate
=
500000
,
timeout
=
5
)
elif
action
in
b
"ACKMPS"
:
c
=
struct
.
pack
(
"B"
,
action
)
ser
.
setDTR
(
False
)
# IO0=HIGH
uartp
.
send
(
c
)
ser
.
setRTS
(
True
)
# EN=LOW, chip in reset
v
=
uartp
.
recv
()
time
.
sleep
(
0.1
)
if
len
(
v
)
<
1
or
v
[
0
]
!=
action
:
ser
.
setDTR
(
False
)
# IO0=HIGH
raise
Exception
(
"Could not obtain variable from board"
)
ser
.
setRTS
(
False
)
# EN=HIGH, chip out of reset
v
=
v
[
1
:]
time
.
sleep
(
1
)
eprint
(
"Var
%
c received from board:
%
s"
%
(
action
,
v
.
hex
()))
l
=
struct
.
pack
(
"<I"
,
len
(
v
))
dut
.
ser
=
ser
sys
.
stdout
.
buffer
.
write
(
l
+
v
)
sys
.
stdout
.
flush
()
dut
.
prepare
()
sys
.
stdout
.
write
(
"Board prepared
\n
"
)
elif
action
in
b
"ed"
:
sys
.
stdout
.
flush
()
c
=
struct
.
pack
(
"B"
,
action
)
uartp
.
send
(
c
)
for
i
,
m
,
ad
,
k
,
npub
,
c
in
kat
:
ack
=
uartp
.
recv
()
tool
.
arm
()
if
len
(
ack
)
!=
1
or
ack
[
0
]
!=
action
:
run_nist_aead_test_line
(
dut
,
i
,
m
,
ad
,
k
,
npub
,
c
)
raise
Exception
(
"Unacknowledged variable transfer"
)
tool
.
unarm
()
eprint
(
"Operation
%
c completed successfully"
%
action
)
except
Exception
as
ex
:
else
:
print
(
"TEST FAILED"
)
raise
Exception
(
"Unknown action
%
c"
%
action
)
raise
ex
finally
:
return
0
tool
.
end_measurement
()
sys
.
stdout
.
flush
()
sys
.
stderr
.
flush
()
if
__name__
==
"__main__"
:
if
__name__
==
"__main__"
:
...
...
templates/f7/test
View file @
4fa05cc7
...
@@ -2,245 +2,98 @@
...
@@ -2,245 +2,98 @@
import
os
import
os
import
sys
import
sys
import
time
import
pylink
import
struct
import
serial.tools.list_ports
import
serial
from
test_common
import
(
import
subprocess
LogicMultiplexerTimeMeasurements
,
parse_nist_aead_test_vectors
,
DeviceUnderTestAeadUARTP
,
compare_dumps
,
eprint
,
run_nist_aead_test_line
,
)
RAM_SIZE
=
0x50000
def
get_serial
():
ports
=
serial
.
tools
.
list_ports
.
comports
()
devices
=
[
p
.
device
for
p
in
ports
if
p
.
serial_number
==
'FT2XA9MY'
]
devices
.
sort
()
return
serial
.
Serial
(
devices
[
0
],
baudrate
=
115200
,
timeout
=
5
)
def
eprint
(
*
args
,
**
kargs
):
print
(
*
args
,
file
=
sys
.
stderr
,
**
kargs
)
class
F7
(
DeviceUnderTestAeadUARTP
):
RAM_SIZE
=
0x50000
def
popen_jlink
():
def
__init__
(
self
,
firmware_path
,
ram_pattern_path
):
pipe
=
subprocess
.
PIPE
DeviceUnderTestAeadUARTP
.
__init__
(
self
,
get_serial
())
cmd
=
[
'JLinkExe'
]
self
.
jlink
=
pylink
.
JLink
()
cmd
.
extend
([
'-autoconnect'
,
'1'
])
self
.
jlink
.
open
(
779340002
)
cmd
.
extend
([
'-device'
,
'STM32F746ZG'
])
self
.
firmware_path
=
firmware_path
cmd
.
extend
([
'-if'
,
'swd'
])
self
.
ram_pattern_path
=
ram_pattern_path
cmd
.
extend
([
'-speed'
,
'4000'
])
return
subprocess
.
Popen
(
cmd
,
stdout
=
sys
.
stderr
,
stdin
=
pipe
)
def
flash
(
self
):
jlink
=
self
.
jlink
jlink
.
connect
(
'STM32F746ZG'
)
jlink
.
flash_file
(
self
.
firmware_path
,
0x8000000
)
eprint
(
"Firmware flashed."
)
jlink
.
flash_file
(
self
.
ram_pattern_path
,
0x20000000
)
eprint
(
"RAM flashed."
)
jlink
.
reset
()
jlink
.
restart
()
def
flash
():
def
dump_ram
(
self
):
p
=
popen_jlink
()
jlink
=
self
.
jlink
return
p
.
communicate
((
"""
return
bytes
(
jlink
.
memory_read8
(
0x20000000
,
F7
.
RAM_SIZE
))
loadbin build/f7.bin 0x8000000
r
g
exit
"""
)
.
encode
(
'ascii'
))
def
fill_ram
():
def
main
(
argv
):
p
=
popen_jlink
()
if
len
(
argv
)
!=
3
:
return
p
.
communicate
((
"""
print
(
"Usage: test LWC_AEAD_KAT.txt build_dir"
)
h
return
1
loadbin ram_pattern.bin 0x20000000
savebin ram_copy.bin 0x20000000 0x
%
x
r
g
exit
"""
%
RAM_SIZE
)
.
encode
(
'ascii'
))
kat
=
list
(
parse_nist_aead_test_vectors
(
argv
[
1
]))
build_dir
=
argv
[
2
]
def
dump_ram
():
dut
=
F7
(
p
=
popen_jlink
()
os
.
path
.
join
(
build_dir
,
'build'
,
'f7.bin'
),
return
p
.
communicate
((
"""
os
.
path
.
join
(
build_dir
,
'ram_pattern.bin'
))
h
savebin ram_dump.bin 0x20000000 0x
%
x
exit
"""
%
RAM_SIZE
)
.
encode
(
'ascii'
))
try
:
tool
=
LogicMultiplexerTimeMeasurements
(
0x000c
)
tool
.
begin_measurement
()
def
get_serial
():
dut
.
flash
()
import
serial.tools.list_ports
dut
.
prepare
()
ports
=
serial
.
tools
.
list_ports
.
comports
()
sys
.
stdout
.
write
(
"Board prepared
\n
"
)
devices
=
[
p
.
device
for
p
in
ports
]
sys
.
stdout
.
flush
()
devices
.
sort
()
return
devices
[
-
1
]
class
UARTP
:
def
__init__
(
self
,
ser
):
UARTP
.
SYN
=
0xf9
UARTP
.
FIN
=
0xf3
self
.
ser
=
ser
def
uart_read
(
self
):
r
=
self
.
ser
.
read
(
1
)
if
len
(
r
)
!=
1
:
raise
Exception
(
"Serial read error"
)
return
r
[
0
]
def
uart_write
(
self
,
c
):
b
=
struct
.
pack
(
"B"
,
c
)
r
=
self
.
ser
.
write
(
b
)
if
r
!=
len
(
b
):
raise
Exception
(
"Serial write error"
)
return
r
def
send
(
self
,
buf
):
self
.
uart_write
(
UARTP
.
SYN
)
len_ind_0
=
0xff
&
len
(
buf
)
len_ind_1
=
0xff
&
(
len
(
buf
)
>>
7
)
if
len
(
buf
)
<
128
:
self
.
uart_write
(
len_ind_0
)
else
:
self
.
uart_write
(
len_ind_0
|
0x80
)
self
.
uart_write
(
len_ind_1
)
fcs
=
0
for
i
in
range
(
len
(
buf
)):
info
=
buf
[
i
]
fcs
=
(
fcs
+
info
)
&
0xff
self
.
uart_write
(
buf
[
i
])
fcs
=
(
0xff
-
fcs
)
&
0xff
self
.
uart_write
(
fcs
)
self
.
uart_write
(
UARTP
.
FIN
)
eprint
(
"sent frame '
%
s'"
%
buf
.
hex
())
def
recv
(
self
):
tag_old
=
UARTP
.
FIN
while
1
:
tag
=
tag_old
while
1
:
if
tag_old
==
UARTP
.
FIN
:
if
tag
==
UARTP
.
SYN
:
break
tag_old
=
tag
tag
=
self
.
uart_read
()
tag_old
=
tag
l
=
self
.
uart_read
()
if
l
&
0x80
:
l
&=
0x7f
l
|=
self
.
uart_read
()
<<
7
fcs
=
0
buf
=
[]
for
i
in
range
(
l
):
info
=
self
.
uart_read
()
buf
.
append
(
info
)
fcs
=
(
fcs
+
info
)
&
0xff
fcs
=
(
fcs
+
self
.
uart_read
())
&
0xff
tag
=
self
.
uart_read
()
if
fcs
==
0xff
:
if
tag
==
UARTP
.
FIN
:
buf
=
bytes
(
buf
)
eprint
(
"rcvd frame '
%
s'"
%
buf
.
hex
())
if
len
(
buf
)
>=
1
and
buf
[
0
]
==
0xde
:
sys
.
stderr
.
buffer
.
write
(
buf
[
1
:])
sys
.
stderr
.
flush
()
else
:
return
buf
def
main
(
argv
):
dump_a
=
dut
.
dump_ram
()
eprint
(
argv
[
0
])
script_dir
=
os
.
path
.
split
(
argv
[
0
])[
0
]
for
i
,
m
,
ad
,
k
,
npub
,
c
in
kat
:
if
len
(
script_dir
)
>
0
:
tool
.
arm
()
os
.
chdir
(
script_dir
)
run_nist_aead_test_line
(
dut
,
i
,
m
,
ad
,
k
,
npub
,
c
)
tool
.
unarm
()
dev
=
get_serial
()
ser
=
serial
.
Serial
(
dev
,
baudrate
=
115200
,
timeout
=
5
)
if
i
==
1
:
uartp
=
UARTP
(
ser
)
dump_b
=
dut
.
dump_ram
()
longest
=
compare_dumps
(
dump_a
,
dump_b
)
flash
()
print
(
" longest chunk of untouched memory =
%
d"
%
longest
)
fill_ram
()
eprint
(
"Flashed"
)
except
Exception
as
ex
:
time
.
sleep
(
0.1
)
print
(
"TEST FAILED"
)
raise
ex
def
stdin_read
(
n
):
b
=
sys
.
stdin
.
buffer
.
read
(
n
)
finally
:
if
len
(
b
)
!=
n
:
tool
.
end_measurement
()
sys
.
exit
(
1
)
sys
.
stdout
.
flush
()
return
b
sys
.
stderr
.
flush
()
def
stdin_readvar
():
l
=
stdin_read
(
4
)
(
l
,
)
=
struct
.
unpack
(
"<I"
,
l
)
v
=
stdin_read
(
l
)
return
v
exp_hello
=
b
"Hello, World!"
hello
=
ser
.
read
(
ser
.
in_waiting
)
if
hello
[
-
13
:]
!=
exp_hello
:
eprint
(
"Improper board initialization message:
%
s"
%
hello
)
return
1
eprint
(
"Board initialized properly"
)
sys
.
stdout
.
write
(
"Hello, World!
\n
"
)
sys
.
stdout
.
flush
()
while
1
:
action
=
stdin_read
(
1
)[
0
]
eprint
(
"Command
%
c from stdin"
%
action
)
if
action
in
b
"ackmps"
:
v
=
stdin_readvar
()
uartp
.
send
(
struct
.
pack
(
"B"
,
action
)
+
v
)
ack
=
uartp
.
recv
()
if
len
(
ack
)
!=
1
or
ack
[
0
]
!=
action
:
raise
Exception
(
"Unacknowledged variable transfer"
)
eprint
(
"Var
%
c successfully sent to board"
%
action
)
elif
action
in
b
"ACKMPS"
:
c
=
struct
.
pack
(
"B"
,
action
)
uartp
.
send
(
c
)
v
=
uartp
.
recv
()
if
len
(
v
)
<
1
or
v
[
0
]
!=
action
:
raise
Exception
(
"Could not obtain variable from board"
)
v
=
v
[
1
:]
eprint
(
"Var
%
c received from board:
%
s"
%
(
action
,
v
.
hex
()))
l
=
struct
.
pack
(
"<I"
,
len
(
v
))
sys
.
stdout
.
buffer
.
write
(
l
+
v
)
sys
.
stdout
.
flush
()
elif
action
in
b
"ed"
:
c
=
struct
.
pack
(
"B"
,
action
)
uartp
.
send
(
c
)
ack
=
uartp
.
recv
()
if
len
(
ack
)
!=
1
or
ack
[
0
]
!=
action
:
raise
Exception
(
"Unacknowledged variable transfer"
)
eprint
(
"Operation
%
c completed successfully"
%
action
)
elif
action
in
b
"u"
:
dump_ram
()
with
open
(
"ram_copy.bin"
,
'rb'
)
as
dump
:
dump_a
=
dump
.
read
()
with
open
(
"ram_dump.bin"
,
'rb'
)
as
dump
:
dump_b
=
dump
.
read
()
if
len
(
dump_a
)
!=
RAM_SIZE
or
len
(
dump_b
)
!=
RAM_SIZE
:
raise
Exception
(
"Wrong dump sizes: 0x
%
x, 0x
%
x"
%
(
len
(
dump_a
),
len
(
dump_b
)))
streaks
=
[]
streak_beg
=
0
streak_end
=
0
for
i
in
range
(
len
(
dump_a
)):
if
dump_a
[
i
]
==
dump_b
[
i
]:
streak_end
=
i
else
:
if
streak_end
!=
streak_beg
:
streaks
.
append
((
streak_beg
,
streak_end
))
streak_beg
=
i
streak_end
=
i
for
b
,
e
in
streaks
:
eprint
(
"equal bytes from 0x
%
x to 0x
%
x (length:
%
d)"
%
(
b
,
e
,
e
-
b
))
b
,
e
=
max
(
streaks
,
key
=
lambda
a
:
a
[
1
]
-
a
[
0
])
eprint
(
"longest equal bytes streak from 0x
%
x to 0x
%
x (length:
%
d)"
%
(
b
,
e
,
e
-
b
))
v
=
struct
.
pack
(
"<II"
,
4
,
e
-
b
)
sys
.
stdout
.
buffer
.
write
(
v
)
sys
.
stdout
.
flush
()
else
:
raise
Exception
(
"Unknown action
%
c"
%
action
)
return
0
if
__name__
==
"__main__"
:
if
__name__
==
"__main__"
:
...
...
templates/maixduino/test
View file @
4fa05cc7
...
@@ -3,191 +3,105 @@
...
@@ -3,191 +3,105 @@
import
os
import
os
import
sys
import
sys
import
time
import
time
import
struct
import
serial
import
subprocess
import
subprocess
import
serial.tools.list_ports
from
test_common
import
(
LogicMultiplexerTimeMeasurements
,
parse_nist_aead_test_vectors
,
DeviceUnderTestAeadUARTP
,
eprint
,
run_nist_aead_test_line
,
)
def
get_serial
():
import
serial.tools.list_ports
ports
=
serial
.
tools
.
list_ports
.
comports
()
ports
=
[
c
for
c
in
ports
if
c
.
product
==
'Sipeed-Debug'
]
ports
.
sort
(
key
=
lambda
d
:
d
.
location
)
return
ports
[
0
]
.
device
def
eprint
(
*
args
,
**
kargs
):
print
(
*
args
,
file
=
sys
.
stderr
,
**
kargs
)
class
Maixduino
(
DeviceUnderTestAeadUARTP
):
def
flash
(
tty
=
None
):
def
__init__
(
self
,
build_dir
):
pipe
=
subprocess
.
PIPE
DeviceUnderTestAeadUARTP
.
__init__
(
self
)
cmd
=
[
'platformio'
,
'run'
,
'--target'
,
'upload'
]
if
tty
is
not
None
:
cmd
.
extend
([
'--upload-port'
,
tty
])
p
=
subprocess
.
Popen
(
cmd
,
stdout
=
sys
.
stderr
,
stdin
=
pipe
)
stdout
,
stderr
=
p
.
communicate
(
""
)
def
get_serial
():
self
.
build_dir
=
build_dir
import
serial.tools.list_ports
ports
=
serial
.
tools
.
list_ports
.
comports
()
def
flash
(
self
):
sipeed_devices
=
[
c
.
device
pipe
=
subprocess
.
PIPE
for
c
in
ports
previous_dir
=
os
.
path
.
abspath
(
os
.
curdir
)
if
c
.
product
==
'Sipeed-Debug'
]
os
.
chdir
(
self
.
build_dir
)
sipeed_devices
.
sort
()
cmd
=
[
'platformio'
,
'run'
,
'-e'
,
'sipeed-maixduino'
]
return
sipeed_devices
[
0
]
cmd
.
extend
([
'--target'
,
'upload'
])
cmd
.
extend
([
'--upload-port'
,
get_serial
()])
cmd
.
extend
([
'--upload-port'
,
get_serial
()])
class
UARTP
:
p
=
subprocess
.
Popen
(
def
__init__
(
self
,
ser
):
cmd
,
stdout
=
sys
.
stderr
,
stdin
=
pipe
)
UARTP
.
SYN
=
0xf9
stdout
,
stderr
=
p
.
communicate
(
""
)
UARTP
.
FIN
=
0xf3
eprint
(
"Firmware flashed."
)
self
.
ser
=
ser
os
.
chdir
(
previous_dir
)
def
uart_read
(
self
):
def
dump_ram
(
self
):
r
=
self
.
ser
.
read
(
1
)
return
None
if
len
(
r
)
!=
1
:
raise
Exception
(
"Serial read error"
)
return
r
[
0
]
def
uart_write
(
self
,
c
):
b
=
struct
.
pack
(
"B"
,
c
)
r
=
self
.
ser
.
write
(
b
)
if
r
!=
len
(
b
):
raise
Exception
(
"Serial write error"
)
return
r
def
send
(
self
,
buf
):
self
.
uart_write
(
UARTP
.
SYN
)
len_ind_0
=
0xff
&
len
(
buf
)
len_ind_1
=
0xff
&
(
len
(
buf
)
>>
7
)
if
len
(
buf
)
<
128
:
self
.
uart_write
(
len_ind_0
)
else
:
self
.
uart_write
(
len_ind_0
|
0x80
)
self
.
uart_write
(
len_ind_1
)
fcs
=
0
for
i
in
range
(
len
(
buf
)):
info
=
buf
[
i
]
fcs
=
(
fcs
+
info
)
&
0xff
self
.
uart_write
(
buf
[
i
])
fcs
=
(
0xff
-
fcs
)
&
0xff
self
.
uart_write
(
fcs
)
self
.
uart_write
(
UARTP
.
FIN
)
eprint
(
"sent frame '
%
s'"
%
buf
.
hex
())
def
recv
(
self
):
tag_old
=
UARTP
.
FIN
while
1
:
tag
=
tag_old
while
1
:
if
tag_old
==
UARTP
.
FIN
:
if
tag
==
UARTP
.
SYN
:
break
tag_old
=
tag
tag
=
self
.
uart_read
()
tag_old
=
tag
l
=
self
.
uart_read
()
if
l
&
0x80
:
l
&=
0x7f
l
|=
self
.
uart_read
()
<<
7
fcs
=
0
buf
=
[]
for
i
in
range
(
l
):
info
=
self
.
uart_read
()
buf
.
append
(
info
)
fcs
=
(
fcs
+
info
)
&
0xff
fcs
=
(
fcs
+
self
.
uart_read
())
&
0xff
tag
=
self
.
uart_read
()
if
fcs
==
0xff
:
if
tag
==
UARTP
.
FIN
:
buf
=
bytes
(
buf
)
eprint
(
"rcvd frame '
%
s'"
%
buf
.
hex
())
if
len
(
buf
)
>=
1
and
buf
[
0
]
==
0xde
:
sys
.
stderr
.
buffer
.
write
(
buf
[
1
:])
sys
.
stderr
.
flush
()
else
:
return
buf
def
stdin_read
(
n
):
b
=
sys
.
stdin
.
buffer
.
read
(
n
)
if
len
(
b
)
!=
n
:
sys
.
exit
(
1
)
return
b
def
stdin_readvar
():
l
=
stdin_read
(
4
)
(
l
,
)
=
struct
.
unpack
(
"<I"
,
l
)
v
=
stdin_read
(
l
)
return
v
def
main
(
argv
):
def
main
(
argv
):
eprint
(
argv
[
0
])
if
len
(
argv
)
!=
3
:
script_dir
=
os
.
path
.
split
(
argv
[
0
])[
0
]
print
(
"Usage: test LWC_AEAD_KAT.txt build_dir"
)
if
len
(
script_dir
)
>
0
:
os
.
chdir
(
script_dir
)
dev
=
get_serial
()
flash
(
dev
)
eprint
(
"Flashed"
)
time
.
sleep
(
0.1
)
ser
=
serial
.
Serial
(
dev
,
baudrate
=
1500000
,
timeout
=
5
)
uartp
=
UARTP
(
ser
)
ser
.
setRTS
(
True
)
time
.
sleep
(
0.1
)
ser
.
setRTS
(
False
)
time
.
sleep
(
0.1
)
ser
.
setRTS
(
True
)
time
.
sleep
(
1
)
exp_hello
=
b
"Hello, World!"
hello
=
ser
.
read
(
len
(
exp_hello
))
if
hello
!=
exp_hello
:
eprint
(
"Improper board initialization message: "
)
return
1
return
1
eprint
(
"Board initialized properly"
)
sys
.
stdout
.
write
(
"Hello, World!
\n
"
)
kat
=
list
(
parse_nist_aead_test_vectors
(
argv
[
1
]))
sys
.
stdout
.
flush
()
build_dir
=
argv
[
2
]
while
1
:
dut
=
Maixduino
(
build_dir
)
action
=
stdin_read
(
1
)[
0
]
eprint
(
"Command
%
c from stdin"
%
action
)
try
:
tool
=
LogicMultiplexerTimeMeasurements
(
0x00c0
)
if
action
in
b
"ackmps"
:
tool
.
begin_measurement
()
v
=
stdin_readvar
()
uartp
.
send
(
struct
.
pack
(
"B"
,
action
)
+
v
)
dut
.
flash
()
ack
=
uartp
.
recv
()
if
len
(
ack
)
!=
1
or
ack
[
0
]
!=
action
:
ser
=
serial
.
Serial
(
raise
Exception
(
"Unacknowledged variable transfer"
)
get_serial
(),
eprint
(
"Var
%
c successfully sent to board"
%
action
)
baudrate
=
1500000
,
timeout
=
5
)
elif
action
in
b
"ACKMPS"
:
c
=
struct
.
pack
(
"B"
,
action
)
ser
.
setRTS
(
True
)
uartp
.
send
(
c
)
time
.
sleep
(
0.1
)
v
=
uartp
.
recv
()
ser
.
setRTS
(
False
)
if
len
(
v
)
<
1
or
v
[
0
]
!=
action
:
time
.
sleep
(
0.1
)
raise
Exception
(
"Could not obtain variable from board"
)
ser
.
setRTS
(
True
)
v
=
v
[
1
:]
time
.
sleep
(
1
)
eprint
(
"Var
%
c received from board:
%
s"
%
(
action
,
v
.
hex
()))
l
=
struct
.
pack
(
"<I"
,
len
(
v
))
dut
.
ser
=
ser
sys
.
stdout
.
buffer
.
write
(
l
+
v
)
sys
.
stdout
.
flush
()
dut
.
prepare
()
sys
.
stdout
.
write
(
"Board prepared
\n
"
)
elif
action
in
b
"ed"
:
sys
.
stdout
.
flush
()
c
=
struct
.
pack
(
"B"
,
action
)
uartp
.
send
(
c
)
for
i
,
m
,
ad
,
k
,
npub
,
c
in
kat
:
ack
=
uartp
.
recv
()
tool
.
arm
()
if
len
(
ack
)
<
1
or
ack
[
0
]
!=
action
:
run_nist_aead_test_line
(
dut
,
i
,
m
,
ad
,
k
,
npub
,
c
)
raise
Exception
(
"Unacknowledged variable transfer"
)
tool
.
unarm
()
eprint
(
"Operation
%
c completed successfully"
%
action
)
except
Exception
as
ex
:
else
:
print
(
"TEST FAILED"
)
raise
Exception
(
"Unknown action
%
c"
%
action
)
raise
ex
finally
:
return
0
tool
.
end_measurement
()
sys
.
stdout
.
flush
()
sys
.
stderr
.
flush
()
if
__name__
==
"__main__"
:
sys
.
exit
(
main
(
sys
.
argv
))
if
__name__
==
"__main__"
:
if
__name__
==
"__main__"
:
...
...
templates/uno/test
View file @
4fa05cc7
...
@@ -3,187 +3,97 @@
...
@@ -3,187 +3,97 @@
import
os
import
os
import
sys
import
sys
import
time
import
time
import
struct
import
serial
import
subprocess
import
subprocess
import
serial.tools.list_ports
from
test_common
import
(
LogicMultiplexerTimeMeasurements
,
parse_nist_aead_test_vectors
,
DeviceUnderTestAeadUARTP
,
eprint
,
run_nist_aead_test_line
,
)
def
get_serial
():
ports
=
serial
.
tools
.
list_ports
.
comports
()
devices
=
[
p
.
device
for
p
in
ports
if
(
p
.
vid
==
0x1A86
and
p
.
pid
==
0x7523
)
]
devices
.
sort
()
return
devices
[
0
]
def
eprint
(
*
args
,
**
kargs
):
print
(
*
args
,
file
=
sys
.
stderr
,
**
kargs
)
class
Uno
(
DeviceUnderTestAeadUARTP
):
def
flash
(
tty
=
None
):
def
__init__
(
self
,
build_dir
):
pipe
=
subprocess
.
PIPE
DeviceUnderTestAeadUARTP
.
__init__
(
self
)
cmd
=
[
'platformio'
,
'run'
,
'-e'
,
'uno'
,
'--target'
,
'upload'
]
if
tty
is
not
None
:
cmd
.
extend
([
'--upload-port'
,
tty
])
p
=
subprocess
.
Popen
(
cmd
,
stdout
=
sys
.
stderr
,
stdin
=
pipe
)
stdout
,
stderr
=
p
.
communicate
(
""
)
def
get_serial
():
self
.
build_dir
=
build_dir
import
serial.tools.list_ports
ports
=
serial
.
tools
.
list_ports
.
comports
()
def
flash
(
self
):
devices
=
[
p
.
device
for
p
in
ports
]
pipe
=
subprocess
.
PIPE
devices
.
sort
()
previous_dir
=
os
.
path
.
abspath
(
os
.
curdir
)
return
devices
[
-
1
]
os
.
chdir
(
self
.
build_dir
)
cmd
=
[
'platformio'
,
'run'
,
'-e'
,
'uno'
,
'--target'
,
'upload'
]
cmd
.
extend
([
'--upload-port'
,
get_serial
()])
class
UARTP
:
cmd
.
extend
([
'--upload-port'
,
get_serial
()])
def
__init__
(
self
,
ser
):
p
=
subprocess
.
Popen
(
UARTP
.
SYN
=
0xf9
cmd
,
stdout
=
sys
.
stderr
,
stdin
=
pipe
)
UARTP
.
FIN
=
0xf3
stdout
,
stderr
=
p
.
communicate
(
""
)
self
.
ser
=
ser
eprint
(
"Firmware flashed."
)
os
.
chdir
(
previous_dir
)
def
uart_read
(
self
):
r
=
self
.
ser
.
read
(
1
)
def
dump_ram
(
self
):
if
len
(
r
)
!=
1
:
return
None
raise
Exception
(
"Serial read error"
)
return
r
[
0
]
def
uart_write
(
self
,
c
):
b
=
struct
.
pack
(
"B"
,
c
)
r
=
self
.
ser
.
write
(
b
)
if
r
!=
len
(
b
):
raise
Exception
(
"Serial write error"
)
return
r
def
send
(
self
,
buf
):
self
.
uart_write
(
UARTP
.
SYN
)
len_ind_0
=
0xff
&
len
(
buf
)
len_ind_1
=
0xff
&
(
len
(
buf
)
>>
7
)
if
len
(
buf
)
<
128
:
self
.
uart_write
(
len_ind_0
)
else
:
self
.
uart_write
(
len_ind_0
|
0x80
)
self
.
uart_write
(
len_ind_1
)
fcs
=
0
for
i
in
range
(
len
(
buf
)):
info
=
buf
[
i
]
fcs
=
(
fcs
+
info
)
&
0xff
self
.
uart_write
(
buf
[
i
])
fcs
=
(
0xff
-
fcs
)
&
0xff
self
.
uart_write
(
fcs
)
self
.
uart_write
(
UARTP
.
FIN
)
eprint
(
"sent frame '
%
s'"
%
buf
.
hex
())
def
recv
(
self
):
tag_old
=
UARTP
.
FIN
while
1
:
tag
=
tag_old
while
1
:
if
tag_old
==
UARTP
.
FIN
:
if
tag
==
UARTP
.
SYN
:
break
tag_old
=
tag
tag
=
self
.
uart_read
()
tag_old
=
tag
l
=
self
.
uart_read
()
if
l
&
0x80
:
l
&=
0x7f
l
|=
self
.
uart_read
()
<<
7
fcs
=
0
buf
=
[]
for
i
in
range
(
l
):
info
=
self
.
uart_read
()
buf
.
append
(
info
)
fcs
=
(
fcs
+
info
)
&
0xff
fcs
=
(
fcs
+
self
.
uart_read
())
&
0xff
tag
=
self
.
uart_read
()
if
fcs
==
0xff
:
if
tag
==
UARTP
.
FIN
:
buf
=
bytes
(
buf
)
eprint
(
"rcvd frame '
%
s'"
%
buf
.
hex
())
if
len
(
buf
)
>=
1
and
buf
[
0
]
==
0xde
:
sys
.
stderr
.
buffer
.
write
(
buf
[
1
:])
sys
.
stderr
.
flush
()
else
:
return
buf
def
stdin_read
(
n
):
b
=
sys
.
stdin
.
buffer
.
read
(
n
)
if
len
(
b
)
!=
n
:
sys
.
exit
(
1
)
return
b
def
stdin_readvar
():
l
=
stdin_read
(
4
)
(
l
,
)
=
struct
.
unpack
(
"<I"
,
l
)
v
=
stdin_read
(
l
)
return
v
def
main
(
argv
):
def
main
(
argv
):
eprint
(
argv
[
0
])
if
len
(
argv
)
!=
3
:
script_dir
=
os
.
path
.
split
(
argv
[
0
])[
0
]
print
(
"Usage: test LWC_AEAD_KAT.txt build_dir"
)
if
len
(
script_dir
)
>
0
:
os
.
chdir
(
script_dir
)
dev
=
get_serial
()
flash
(
dev
)
eprint
(
"Flashed"
)
time
.
sleep
(
0.1
)
ser
=
serial
.
Serial
(
dev
,
baudrate
=
115200
,
timeout
=
5
)
uartp
=
UARTP
(
ser
)
ser
.
setDTR
(
True
)
time
.
sleep
(
0.01
)
ser
.
setDTR
(
False
)
time
.
sleep
(
1
)
exp_hello
=
b
"Hello, World!"
hello
=
ser
.
read
(
len
(
exp_hello
))
if
hello
!=
exp_hello
:
eprint
(
"Improper board initialization message: "
)
return
1
return
1
eprint
(
"Board initialized properly"
)
sys
.
stdout
.
write
(
"Hello, World!
\n
"
)
kat
=
list
(
parse_nist_aead_test_vectors
(
argv
[
1
])
)
sys
.
stdout
.
flush
()
build_dir
=
argv
[
2
]
while
1
:
dut
=
Uno
(
build_dir
)
action
=
stdin_read
(
1
)[
0
]
eprint
(
"Command
%
c from stdin"
%
action
)
try
:
tool
=
LogicMultiplexerTimeMeasurements
(
0x0c00
)
if
action
in
b
"ackmps"
:
tool
.
begin_measurement
()
v
=
stdin_readvar
()
uartp
.
send
(
struct
.
pack
(
"B"
,
action
)
+
v
)
dut
.
flash
(
)
ack
=
uartp
.
recv
()
if
len
(
ack
)
!=
1
or
ack
[
0
]
!=
action
:
ser
=
serial
.
Serial
(
raise
Exception
(
"Unacknowledged variable transfer"
)
get_serial
(),
eprint
(
"Var
%
c successfully sent to board"
%
action
)
baudrate
=
115200
,
timeout
=
5
)
elif
action
in
b
"ACKMPS"
:
c
=
struct
.
pack
(
"B"
,
action
)
ser
.
setDTR
(
True
)
uartp
.
send
(
c
)
time
.
sleep
(
0.01
)
v
=
uartp
.
recv
(
)
ser
.
setDTR
(
False
)
if
len
(
v
)
<
1
or
v
[
0
]
!=
action
:
time
.
sleep
(
1
)
raise
Exception
(
"Could not obtain variable from board"
)
v
=
v
[
1
:]
dut
.
ser
=
ser
eprint
(
"Var
%
c received from board:
%
s"
%
(
action
,
v
.
hex
()))
l
=
struct
.
pack
(
"<I"
,
len
(
v
)
)
dut
.
prepare
(
)
sys
.
stdout
.
buffer
.
write
(
l
+
v
)
sys
.
stdout
.
write
(
"Board prepared
\n
"
)
sys
.
stdout
.
flush
()
sys
.
stdout
.
flush
()
elif
action
in
b
"ed"
:
for
i
,
m
,
ad
,
k
,
npub
,
c
in
kat
:
c
=
struct
.
pack
(
"B"
,
action
)
tool
.
arm
(
)
uartp
.
send
(
c
)
run_nist_aead_test_line
(
dut
,
i
,
m
,
ad
,
k
,
npub
,
c
)
ack
=
uartp
.
recv
()
tool
.
unarm
()
if
len
(
ack
)
<
1
or
ack
[
0
]
!=
action
:
raise
Exception
(
"Unacknowledged variable transfer"
)
except
Exception
as
ex
:
eprint
(
"Operation
%
c completed successfully"
%
action
)
print
(
"TEST FAILED"
)
raise
ex
else
:
raise
Exception
(
"Unknown action
%
c"
%
action
)
finally
:
tool
.
end_measurement
()
sys
.
stdout
.
flush
()
return
0
sys
.
stderr
.
flush
()
if
__name__
==
"__main__"
:
if
__name__
==
"__main__"
:
...
...
test.py
deleted
100755 → 0
View file @
12fe513c
#!/usr/bin/env python3
import
re
import
os
import
sys
import
struct
from
subprocess
import
Popen
,
PIPE
def
main
(
argv
):
speed_test
=
True
if
len
(
argv
)
<
3
:
print
(
"Usage: test.py LWC_AEAD_KAT.txt program [arguments]"
)
cmd
=
argv
[
2
:]
for
attempt
in
range
(
3
):
print
(
"beginning test
%
d of '
%
s' using test vectors '
%
s'"
%
(
attempt
,
' '
.
join
(
cmd
),
argv
[
1
]))
try
:
if
speed_test
:
measurements
=
begin_measurement
()
try
:
test
(
argv
[
1
],
cmd
)
finally
:
if
speed_test
:
end_measurement
(
measurements
)
print
(
"TEST SUCCESSFUL"
)
return
0
except
Exception
as
ex
:
print
(
str
(
ex
))
print
(
"TEST FAILED"
)
finally
:
sys
.
stdout
.
flush
()
sys
.
stderr
.
flush
()
return
1
def
test
(
test_file
,
cmd
,
ram_test
=
False
):
test_file
=
open
(
test_file
,
'r'
)
p
=
Popen
(
cmd
,
bufsize
=
0
,
stdin
=
PIPE
,
stdout
=
PIPE
)
def
write
(
data
):
l
=
p
.
stdin
.
write
(
data
)
if
len
(
data
)
!=
l
:
raise
Exception
(
"could not write
%
d bytes of data (put
%
d)"
%
(
len
(
data
),
l
))
def
read
(
l
):
if
l
==
0
:
return
b
""
data
=
p
.
stdout
.
read
(
l
)
if
len
(
data
)
==
0
:
print
(
"Unexpected end of stream"
,
file
=
sys
.
stderr
)
#sys.exit(1)
if
len
(
data
)
!=
l
:
raise
Exception
(
"could not read
%
d bytes of data (got
%
d)"
%
(
l
,
len
(
data
)))
return
data
def
submit
(
action
,
data
):
h
=
struct
.
pack
(
"<BI"
,
ord
(
action
),
len
(
data
))
write
(
h
)
write
(
data
)
def
obtain
():
l
=
read
(
4
)
(
l
,
)
=
struct
.
unpack
(
"<I"
,
l
)
return
read
(
l
)
output
=
read
(
14
)
if
output
!=
b
"Hello, World!
\n
"
:
raise
Exception
(
"Unexpected output:
%
s"
%
output
)
print
(
"Ready"
)
m
=
b
""
ad
=
b
""
k
=
b
""
npub
=
b
""
i
=
0
lineprog
=
re
.
compile
(
r"^\s*([A-Z]+)\s*=\s*(([0-9a-f])*)\s*$"
,
re
.
IGNORECASE
)
for
line
in
test_file
.
readlines
():
line
=
line
.
strip
()
res
=
lineprog
.
match
(
line
)
if
line
==
""
:
print
()
print
(
"Count =
%
d"
%
i
)
print
(
" m =
%
s"
%
m
.
hex
())
print
(
" ad =
%
s"
%
ad
.
hex
())
print
(
"npub =
%
s"
%
npub
.
hex
())
print
(
" k =
%
s"
%
k
.
hex
())
print
(
" c =
%
s"
%
c
.
hex
())
submit
(
'c'
,
b
"
\0
"
*
(
len
(
m
)
+
32
))
submit
(
's'
,
b
""
)
submit
(
'm'
,
m
)
submit
(
'a'
,
ad
)
submit
(
'k'
,
k
)
submit
(
'p'
,
npub
)
write
(
b
'e'
)
write
(
b
'C'
)
output
=
obtain
()
print
(
" c =
%
s"
%
output
.
hex
())
if
c
!=
output
:
raise
Exception
(
"output of encryption is different from expected ciphertext"
)
submit
(
'm'
,
b
"
\0
"
*
len
(
c
))
submit
(
's'
,
b
""
)
submit
(
'c'
,
c
)
submit
(
'a'
,
ad
)
submit
(
'k'
,
k
)
submit
(
'p'
,
npub
)
write
(
b
'd'
)
write
(
b
'M'
)
output
=
obtain
()
print
(
" m =
%
s"
%
output
.
hex
())
if
m
!=
output
:
raise
Exception
(
"output of encryption is different from expected ciphertext"
)
if
ram_test
:
# RAM test only tests the first test vector
write
(
b
'u'
)
output
=
obtain
()
print
(
" untouched memory =
%
d"
%
struct
.
unpack
(
"<I"
,
output
))
break
elif
res
is
not
None
:
if
res
[
1
]
.
lower
()
==
'count'
:
i
=
int
(
res
[
2
],
10
)
elif
res
[
1
]
.
lower
()
==
'key'
:
k
=
bytes
.
fromhex
(
res
[
2
])
elif
res
[
1
]
.
lower
()
==
'nonce'
:
npub
=
bytes
.
fromhex
(
res
[
2
])
elif
res
[
1
]
.
lower
()
==
'pt'
:
m
=
bytes
.
fromhex
(
res
[
2
])
elif
res
[
1
]
.
lower
()
==
'ad'
:
ad
=
bytes
.
fromhex
(
res
[
2
])
elif
res
[
1
]
.
lower
()
==
'ct'
:
c
=
bytes
.
fromhex
(
res
[
2
])
else
:
raise
Exception
(
"ERROR: unparsed line in test vectors file: '
%
s'"
%
res
)
else
:
raise
Exception
(
"ERROR: unparsed line in test vectors file: '
%
s'"
%
line
)
def
begin_measurement
():
import
saleae
import
time
sal
=
saleae
.
Saleae
()
# Channel 0 is reset
# Channel 1 is crypto_busy
sal
.
set_active_channels
([
0
,
1
],
[])
sal
.
set_sample_rate
(
sal
.
get_all_sample_rates
()[
0
])
sal
.
set_capture_seconds
(
6000
)
sal
.
capture_start
()
time
.
sleep
(
1
)
if
sal
.
is_processing_complete
():
raise
Exception
(
"Capture didn't start successfully"
)
return
sal
def
end_measurement
(
sal
):
import
time
if
sal
.
is_processing_complete
():
raise
Exception
(
"Capture finished before expected"
)
time
.
sleep
(
1
)
sal
.
capture_stop
();
time
.
sleep
(
.
1
)
for
attempt
in
range
(
3
):
if
not
sal
.
is_processing_complete
():
print
(
"Waiting for capture to complete..."
)
time
.
sleep
(
1
)
continue
outfile
=
"measurement_
%
s.csv"
%
time
.
strftime
(
"
%
Y
%
m
%
d-
%
H
%
M
%
S"
)
outfile
=
os
.
path
.
join
(
"measurements"
,
outfile
)
if
os
.
path
.
isfile
(
outfile
):
os
.
unlink
(
outfile
)
sal
.
export_data2
(
os
.
path
.
abspath
(
outfile
))
print
(
"Measurements written to '
%
s'"
%
outfile
)
mdbfile
=
os
.
path
.
join
(
"measurements"
,
"measurements.txt"
)
mdbfile
=
open
(
mdbfile
,
"a"
)
mdbfile
.
write
(
"
%
s >
%
s
\n
"
%
(
' '
.
join
(
sys
.
argv
),
outfile
))
mdbfile
.
close
()
return
0
raise
Exception
(
"Capture didn't complete successfully"
)
if
__name__
==
"__main__"
:
sys
.
exit
(
main
(
sys
.
argv
))
test_common.py
0 → 100644
View file @
4fa05cc7
#!/usr/bin/env python3
import
os
import
re
import
sys
import
time
import
struct
import
serial
def
eprint
(
*
args
,
**
kargs
):
print
(
*
args
,
file
=
sys
.
stderr
,
**
kargs
)
class
DeviceUnderTest
:
def
__init__
(
self
):
pass
def
flash
(
self
):
"""
This method should be overridden to flash the DUT.
"""
pass
def
dump_ram
(
self
):
"""
This should be overridden to return the RAM dump as a bytes string.
"""
return
None
class
DeviceUnderTestAeadUARTP
(
DeviceUnderTest
):
def
__init__
(
self
,
ser
=
None
):
self
.
ser
=
ser
def
prepare
(
self
):
exp_hello
=
b
"Hello, World!"
time
.
sleep
(
0.1
)
if
self
.
ser
.
in_waiting
<
13
:
time
.
sleep
(
2
)
hello
=
self
.
ser
.
read
(
self
.
ser
.
in_waiting
)
if
hello
[
-
13
:]
!=
exp_hello
:
raise
Exception
(
"Improper board initialization message:
%
s"
%
hello
)
self
.
uartp
=
UARTP
(
self
.
ser
)
def
send_var
(
self
,
key
,
value
):
self
.
uartp
.
send
(
struct
.
pack
(
"B"
,
key
)
+
value
)
ack
=
self
.
uartp
.
recv
()
if
len
(
ack
)
!=
1
or
ack
[
0
]
!=
key
:
raise
Exception
(
"Unacknowledged variable transfer"
)
def
obtain_var
(
self
,
key
):
c
=
struct
.
pack
(
"B"
,
key
)
self
.
uartp
.
send
(
c
)
v
=
self
.
uartp
.
recv
()
if
len
(
v
)
<
1
or
v
[
0
]
!=
key
:
raise
Exception
(
"Could not obtain variable from board"
)
return
v
[
1
:]
def
do_cmd
(
self
,
action
):
c
=
struct
.
pack
(
"B"
,
action
)
self
.
uartp
.
send
(
c
)
ack
=
self
.
uartp
.
recv
()
if
len
(
ack
)
!=
1
or
ack
[
0
]
!=
action
:
raise
Exception
(
"Unacknowledged command"
)
class
UARTP
:
def
__init__
(
self
,
ser
):
UARTP
.
SYN
=
0xf9
UARTP
.
FIN
=
0xf3
self
.
ser
=
ser
def
uart_read
(
self
):
r
=
self
.
ser
.
read
(
1
)
if
len
(
r
)
!=
1
:
raise
Exception
(
"Serial read error"
)
return
r
[
0
]
def
uart_write
(
self
,
c
):
b
=
struct
.
pack
(
"B"
,
c
)
r
=
self
.
ser
.
write
(
b
)
if
r
!=
len
(
b
):
raise
Exception
(
"Serial write error"
)
return
r
def
send
(
self
,
buf
):
self
.
uart_write
(
UARTP
.
SYN
)
len_ind_0
=
0xff
&
len
(
buf
)
len_ind_1
=
0xff
&
(
len
(
buf
)
>>
7
)
if
len
(
buf
)
<
128
:
self
.
uart_write
(
len_ind_0
)
else
:
self
.
uart_write
(
len_ind_0
|
0x80
)
self
.
uart_write
(
len_ind_1
)
fcs
=
0
for
i
in
range
(
len
(
buf
)):
info
=
buf
[
i
]
fcs
=
(
fcs
+
info
)
&
0xff
self
.
uart_write
(
buf
[
i
])
fcs
=
(
0xff
-
fcs
)
&
0xff
self
.
uart_write
(
fcs
)
self
.
uart_write
(
UARTP
.
FIN
)
# eprint("sent frame '%s'" % buf.hex())
def
recv
(
self
):
tag_old
=
UARTP
.
FIN
while
1
:
tag
=
tag_old
while
1
:
if
tag_old
==
UARTP
.
FIN
:
if
tag
==
UARTP
.
SYN
:
break
tag_old
=
tag
tag
=
self
.
uart_read
()
tag_old
=
tag
pkt
=
self
.
uart_read
()
if
pkt
&
0x80
:
pkt
&=
0x7f
pkt
|=
self
.
uart_read
()
<<
7
fcs
=
0
buf
=
[]
for
i
in
range
(
pkt
):
info
=
self
.
uart_read
()
buf
.
append
(
info
)
fcs
=
(
fcs
+
info
)
&
0xff
fcs
=
(
fcs
+
self
.
uart_read
())
&
0xff
tag
=
self
.
uart_read
()
if
fcs
==
0xff
:
if
tag
==
UARTP
.
FIN
:
buf
=
bytes
(
buf
)
# eprint("rcvd frame '%s'" % buf.hex())
if
len
(
buf
)
>=
1
and
buf
[
0
]
==
0xde
:
sys
.
stderr
.
buffer
.
write
(
buf
[
1
:])
sys
.
stderr
.
flush
()
else
:
return
buf
def
run_nist_aead_test_line
(
dut
,
i
,
m
,
ad
,
k
,
npub
,
c
):
eprint
()
eprint
(
"Count =
%
d"
%
i
)
eprint
(
" m =
%
s"
%
m
.
hex
())
eprint
(
" ad =
%
s"
%
ad
.
hex
())
eprint
(
"npub =
%
s"
%
npub
.
hex
())
eprint
(
" k =
%
s"
%
k
.
hex
())
eprint
(
" c =
%
s"
%
c
.
hex
())
dut
.
send_var
(
ord
(
'c'
),
b
"
\0
"
*
(
len
(
m
)
+
32
))
dut
.
send_var
(
ord
(
's'
),
b
""
)
dut
.
send_var
(
ord
(
'm'
),
m
)
dut
.
send_var
(
ord
(
'a'
),
ad
)
dut
.
send_var
(
ord
(
'k'
),
k
)
dut
.
send_var
(
ord
(
'p'
),
npub
)
dut
.
do_cmd
(
ord
(
'e'
))
output
=
dut
.
obtain_var
(
ord
(
'C'
))
print
(
" c =
%
s"
%
output
.
hex
())
if
c
!=
output
:
raise
Exception
(
"output of encryption is different from "
+
"expected ciphertext"
)
dut
.
send_var
(
ord
(
'm'
),
b
"
\0
"
*
len
(
c
))
dut
.
send_var
(
ord
(
's'
),
b
""
)
dut
.
send_var
(
ord
(
'c'
),
c
)
dut
.
send_var
(
ord
(
'a'
),
ad
)
dut
.
send_var
(
ord
(
'k'
),
k
)
dut
.
send_var
(
ord
(
'p'
),
npub
)
dut
.
do_cmd
(
ord
(
'd'
))
output
=
dut
.
obtain_var
(
ord
(
'M'
))
print
(
" m =
%
s"
%
output
.
hex
())
if
m
!=
output
:
raise
Exception
(
"output of encryption is different from "
+
"expected ciphertext"
)
def
compare_dumps
(
dump_a
,
dump_b
):
"""
Gets the length of the longes streaks of equal bytes in two RAM dumps
"""
streaks
=
[]
streak_beg
=
0
streak_end
=
0
for
i
in
range
(
len
(
dump_a
)):
if
dump_a
[
i
]
==
dump_b
[
i
]:
streak_end
=
i
else
:
if
streak_end
!=
streak_beg
:
streaks
.
append
((
streak_beg
,
streak_end
))
streak_beg
=
i
streak_end
=
i
for
b
,
e
in
streaks
:
eprint
(
"equal bytes from 0x
%
x to 0x
%
x (length:
%
d)"
%
(
b
,
e
,
e
-
b
))
b
,
e
=
max
(
streaks
,
key
=
lambda
a
:
a
[
1
]
-
a
[
0
])
eprint
(
"longest equal bytes streak from 0x
%
x to 0x
%
x (length:
%
d)"
%
(
b
,
e
,
e
-
b
))
return
e
-
b
def
parse_nist_aead_test_vectors
(
test_file_path
):
with
open
(
test_file_path
,
'r'
)
as
test_file
:
lineprog
=
re
.
compile
(
r"^\s*([A-Z]+)\s*=\s*(([0-9a-f])*)\s*$"
,
re
.
IGNORECASE
)
m
=
b
""
ad
=
b
""
k
=
b
""
npub
=
b
""
c
=
b
""
i
=
-
1
for
line
in
test_file
.
readlines
():
line
=
line
.
strip
()
res
=
lineprog
.
match
(
line
)
if
line
==
""
:
yield
i
,
m
,
ad
,
k
,
npub
,
c
m
=
b
""
ad
=
b
""
k
=
b
""
npub
=
b
""
c
=
b
""
elif
res
is
not
None
:
if
res
[
1
]
.
lower
()
==
'count'
:
i
=
int
(
res
[
2
],
10
)
elif
res
[
1
]
.
lower
()
==
'key'
:
k
=
bytes
.
fromhex
(
res
[
2
])
elif
res
[
1
]
.
lower
()
==
'nonce'
:
npub
=
bytes
.
fromhex
(
res
[
2
])
elif
res
[
1
]
.
lower
()
==
'pt'
:
m
=
bytes
.
fromhex
(
res
[
2
])
elif
res
[
1
]
.
lower
()
==
'ad'
:
ad
=
bytes
.
fromhex
(
res
[
2
])
elif
res
[
1
]
.
lower
()
==
'ct'
:
c
=
bytes
.
fromhex
(
res
[
2
])
else
:
raise
Exception
(
"ERROR: unparsed line in test vectors file: '
%
s'"
%
res
)
else
:
raise
Exception
(
"ERROR: unparsed line in test vectors file: '
%
s'"
%
line
)
class
TimeMeasurementTool
:
def
begin_measurement
(
self
):
pass
def
arm
(
self
):
pass
def
unarm
(
self
):
pass
def
end_measurement
(
self
):
pass
class
LogicMultiplexerTimeMeasurements
(
TimeMeasurementTool
):
def
__init__
(
self
,
mask
=
0xffffffffffffffff
):
self
.
mask
=
mask
self
.
sock
=
None
self
.
capture
=
[]
def
recv_samples
(
self
):
import
socket
capture
=
[]
while
1
:
try
:
rcvd
=
self
.
sock
.
recv
(
16
)
except
socket
.
timeout
:
break
except
BlockingIOError
:
break
if
len
(
rcvd
)
!=
16
:
raise
Exception
(
"Could not receive 16 bytes of logic sample!"
)
time
,
value
=
struct
.
unpack
(
"<dQ"
,
rcvd
)
eprint
(
"
%16.10
f:
%016
x"
%
(
time
,
value
))
capture
.
append
((
time
,
value
))
return
capture
def
begin_measurement
(
self
):
import
socket
server_addr
=
os
.
path
.
expandvars
(
'$XDG_RUNTIME_DIR/lwc-logic-socket'
)
self
.
sock
=
socket
.
socket
(
socket
.
AF_UNIX
,
socket
.
SOCK_STREAM
)
self
.
sock
.
settimeout
(
0
)
self
.
server_addr
=
server_addr
self
.
sock
.
connect
(
self
.
server_addr
)
self
.
sock
.
send
(
struct
.
pack
(
"<Q"
,
self
.
mask
))
def
arm
(
self
):
self
.
capture
.
extend
(
self
.
recv_samples
())
def
unarm
(
self
):
self
.
capture
.
extend
(
self
.
recv_samples
())
def
end_measurement
(
self
):
time
.
sleep
(
1
)
self
.
capture
.
extend
(
self
.
recv_samples
())
self
.
sock
.
close
()
class
SaleaeTimeMeasurements
(
TimeMeasurementTool
):
__slots__
=
[
'sal'
]
def
__init__
(
self
,
channels
=
[
0
,
1
]):
import
saleae
sal
=
saleae
.
Saleae
()
sal
.
set_active_channels
(
self
.
channels
,
[])
sal
.
set_sample_rate
(
sal
.
get_all_sample_rates
()[
0
])
sal
.
set_capture_seconds
(
6000
)
self
.
sal
=
sal
def
begin_measurement
(
self
):
# Channel 0 is reset
# Channel 1 is crypto_busy
import
time
sal
=
self
.
sal
sal
.
capture_start
()
time
.
sleep
(
1
)
if
sal
.
is_processing_complete
():
raise
Exception
(
"Capture didn't start successfully"
)
def
end_measurement
(
self
):
import
time
sal
=
self
.
sal
if
sal
.
is_processing_complete
():
raise
Exception
(
"Capture finished before expected"
)
time
.
sleep
(
1
)
sal
.
capture_stop
()
time
.
sleep
(
.
1
)
for
attempt
in
range
(
3
):
if
not
sal
.
is_processing_complete
():
print
(
"Waiting for capture to complete..."
)
time
.
sleep
(
1
)
continue
outfile
=
"measurement_
%
s.csv"
%
time
.
strftime
(
"
%
Y
%
m
%
d-
%
H
%
M
%
S"
)
outfile
=
os
.
path
.
join
(
"measurements"
,
outfile
)
if
os
.
path
.
isfile
(
outfile
):
os
.
unlink
(
outfile
)
sal
.
export_data2
(
os
.
path
.
abspath
(
outfile
))
print
(
"Measurements written to '
%
s'"
%
outfile
)
mdbfile
=
os
.
path
.
join
(
"measurements"
,
"measurements.txt"
)
mdbfile
=
open
(
mdbfile
,
"a"
)
mdbfile
.
write
(
"
%
s >
%
s
\n
"
%
(
' '
.
join
(
sys
.
argv
),
outfile
))
mdbfile
.
close
()
return
0
raise
Exception
(
"Capture didn't complete successfully"
)
def
main
(
argv
):
if
len
(
argv
)
<
3
:
print
(
"Usage: test_common.py port LWC_AEAD_KAT.txt"
)
eprint
(
argv
[
0
])
script_dir
=
os
.
path
.
split
(
argv
[
0
])[
0
]
if
len
(
script_dir
)
>
0
:
os
.
chdir
(
script_dir
)
kat
=
list
(
parse_nist_aead_test_vectors
(
argv
[
2
]))
dev
=
argv
[
1
]
ser
=
serial
.
Serial
(
dev
,
baudrate
=
115200
,
timeout
=
5
)
dut
=
DeviceUnderTestAeadUARTP
(
ser
)
try
:
tool
=
SaleaeTimeMeasurements
()
tool
.
begin_measurement
()
dut
.
flash
()
eprint
(
"Flashed"
)
dut
.
prepare
()
eprint
(
"Prepared"
)
sys
.
stdout
.
write
(
"Hello, World!
\n
"
)
sys
.
stdout
.
flush
()
dump_a
=
dut
.
dump_ram
()
for
i
,
m
,
ad
,
k
,
npub
,
c
in
kat
:
tool
.
arm
()
run_nist_aead_test_line
(
dut
,
i
,
m
,
ad
,
k
,
npub
,
c
)
tool
.
unarm
()
if
dump_a
is
not
None
and
i
==
1
:
dump_b
=
dut
.
dump_ram
()
longest
=
compare_dumps
(
dump_a
,
dump_b
)
print
(
" longest chunk of untouched memory =
%
d"
%
longest
)
except
Exception
as
ex
:
print
(
"TEST FAILED"
)
raise
ex
finally
:
tool
.
end_measurement
()
sys
.
stdout
.
flush
()
sys
.
stderr
.
flush
()
if
__name__
==
"__main__"
:
sys
.
exit
(
main
(
sys
.
argv
))
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment