Commit eabadd99 by lwc-tester

lock for bluepill

parent bce8b983
......@@ -21,6 +21,6 @@ source [find target/stm32f1x.cfg]
#tpiu config internal swodump.stm32f103-generic.log uart off 72000000
reset_config srst_only srst_push_pull srst_nogate connect_assert_srst
#reset_config none srst_push_pull srst_nogate
#reset_config srst_only srst_push_pull srst_nogate connect_assert_srst
reset_config none srst_push_pull srst_nogate
......@@ -2,7 +2,6 @@
import os
import sys
import subprocess
import serial.tools.list_ports
from test_common import (
LogicMultiplexerTimeMeasurements,
......@@ -10,6 +9,8 @@ from test_common import (
DeviceUnderTestAeadUARTP,
compare_dumps,
eprint,
OpenOcd,
FileMutex,
run_nist_aead_test_line,
)
......@@ -22,17 +23,18 @@ def get_serial():
if p.serial_number == 'FT2XCRZ1'
]
devices.sort()
return serial.Serial(
devices[0],
baudrate=115200,
timeout=5)
eprint("Serial port for BluePill is %s" % devices[0])
return devices[0]
class BluePill(DeviceUnderTestAeadUARTP):
RAM_SIZE = 0x5000
LOCK_PATH = '/var/lock/lwc-compare.bluepill.lock'
def __init__(self, build_dir):
DeviceUnderTestAeadUARTP.__init__(self, get_serial())
DeviceUnderTestAeadUARTP.__init__(self)
self.lock = FileMutex(BluePill.LOCK_PATH)
self.firmware_path = os.path.join(
build_dir, '.pio/build/bluepill_f103c8/firmware.elf')
......@@ -43,36 +45,45 @@ class BluePill(DeviceUnderTestAeadUARTP):
self.openocd_cfg_path = os.path.join(
build_dir, 'openocd.cfg')
self.ocd = OpenOcd(self.openocd_cfg_path)
def flash(self):
# pipe = subprocess.PIPE
cmd = [
'openocd', '-f', 'openocd.cfg', '-c',
'program %s verify reset exit' % self.firmware_path]
p = subprocess.Popen(
cmd, stdout=sys.stderr, stdin=sys.stdout)
stdout, stderr = p.communicate("")
ocd_cmd = 'program %s verify reset' % self.firmware_path
res = self.ocd.send(ocd_cmd)
eprint(res)
assert res == ''
eprint("Firmware flashed.")
cmd = [
'openocd', '-f', self.openocd_cfg_path, '-c',
'program %s reset exit 0x20000000' % self.ram_pattern_path]
p = subprocess.Popen(
cmd, stdout=sys.stderr, stdin=sys.stdout)
stdout, stderr = p.communicate("")
ocd_cmd = 'reset halt'
res = self.ocd.send(ocd_cmd)
eprint(res)
assert res == ''
ocd_cmd = 'load_image %s 0x20000000' % self.ram_pattern_path
res = self.ocd.send(ocd_cmd)
eprint(res)
assert res == ''
eprint("RAM flashed.")
ocd_cmd = 'resume'
res = self.ocd.send(ocd_cmd)
eprint(res)
assert res == ''
def reset(self, halt=False):
ocd_cmd = 'reset halt' if halt else 'reset run'
res = self.ocd.send(ocd_cmd)
eprint(res)
assert res == ''
eprint("Reset!")
def dump_ram(self):
cmd = [
'openocd', '-f', self.openocd_cfg_path,
'-c', 'init',
'-c', 'halt',
'-c', 'dump_image %s 0x20000000 0x%x' % (
self.ram_dump_path, BluePill.RAM_SIZE),
'-c', 'resume',
'-c', 'exit']
p = subprocess.Popen(
cmd, stdout=sys.stderr, stdin=sys.stdout)
stdout, stderr = p.communicate("")
res = self.ocd.send(
'dump_image %s 0x20000000 0x%x' %
(self.ram_dump_path, BluePill.RAM_SIZE))
eprint(res)
assert res == ''
eprint("RAM dumped.")
with open(self.ram_dump_path, 'rb') as ram:
ram = ram.read()
......@@ -93,17 +104,28 @@ def main(argv):
dut = BluePill(build_dir)
dut.reset()
dut.flash()
ser = serial.Serial(
get_serial(),
baudrate=115200,
timeout=5)
dut.reset()
dut.ser = ser
dut.prepare()
sys.stdout.write("Board prepared\n")
sys.stdout.flush()
dump_a = dut.dump_ram()
try:
tool = LogicMultiplexerTimeMeasurements(0x0003)
tool.begin_measurement()
dut.flash()
dut.prepare()
sys.stdout.write("Board prepared\n")
sys.stdout.flush()
dump_a = dut.dump_ram()
for i, m, ad, k, npub, c in kat:
tool.arm()
run_nist_aead_test_line(dut, i, m, ad, k, npub, c)
......@@ -120,8 +142,9 @@ def main(argv):
finally:
tool.end_measurement()
sys.stdout.flush()
sys.stderr.flush()
sys.stdout.flush()
sys.stderr.flush()
if __name__ == "__main__":
......
......@@ -4,8 +4,11 @@ import os
import re
import sys
import time
import fcntl
import struct
import serial
import socket
import subprocess
def eprint(*args, **kargs):
......@@ -410,5 +413,85 @@ def main(argv):
sys.stderr.flush()
class FileMutex:
def __init__(self, lock_path):
self.lock_path = lock_path
self.locked = False
self.lock_fd = None
eprint("Locking %s mutex" % lock_path)
self.lock_fd = open(lock_path, 'w')
fcntl.lockf(self.lock_fd, fcntl.LOCK_EX)
self.locked = True
print('%d' % os.getpid(), file=self.lock_fd)
self.lock_fd.flush()
eprint("%s mutex locked." % lock_path)
def __del__(self):
if not self.locked:
return
eprint("Releasing %s mutex" % self.lock_path)
self.lock_fd.close()
self.locked = False
eprint("%s mutex released." % self.lock_path)
class OpenOcd:
def __init__(self, config_file, tcl_port=6666, verbose=False):
self.verbose = verbose
self.tclRpcIp = "127.0.0.1"
self.tclRpcPort = tcl_port
self.bufferSize = 4096
self.process = subprocess.Popen([
'openocd',
'-f', config_file,
'-c', 'tcl_port %d' % tcl_port,
'-c', 'gdb_port disabled',
'-c', 'telnet_port disabled',
], stderr=sys.stderr, stdout=sys.stderr)
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
while 1:
try:
self.sock.connect((self.tclRpcIp, self.tclRpcPort))
break
except Exception:
time.sleep(.1)
def __del__(self):
self.send('exit')
self.sock.close()
self.process.kill()
time.sleep(.1)
self.process.send_signal(9)
def send(self, cmd):
"""
Send a command string to TCL RPC. Return the result that was read.
"""
data = cmd.encode('ascii')
if self.verbose:
print("<- ", data)
self.sock.send(data + b"\x1a")
res = self._recv()
return res
def _recv(self):
"""
Read from the stream until the token (\x1a) was received.
"""
data = b''
while len(data) < 1 or data[-1] != 0x1a:
chunk = self.sock.recv(self.bufferSize)
data += chunk
data = data[:-1] # strip trailing \x1a
if self.verbose:
print("-> ", data)
return data.decode('ascii')
if __name__ == "__main__":
sys.exit(main(sys.argv))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment