test 4.75 KB
Newer Older
1 2 3 4
#!/usr/bin/env python3

import os
import sys
5 6
import struct
import serial
7 8
import subprocess

9 10


11 12 13
def eprint(*args, **kargs):
    print(*args, file=sys.stderr, **kargs)

14 15 16 17 18 19 20

def flash():
    pipe = subprocess.PIPE
    p = subprocess.Popen(['platformio', 'run', '-e', 'uno', '--target', 'upload'],
            stdout=sys.stderr, stdin=pipe)
    stdout, stderr = p.communicate("") 

21 22 23 24 25 26 27 28
def get_serial():
    import serial.tools.list_ports
    ports = serial.tools.list_ports.comports()
    devices = [ p.device for p in ports ]
    devices.sort()
    return devices[-1]


29 30 31 32 33
class UARTP:
    def __init__(self, ser):
        UARTP.SYN = 0xf9
        UARTP.FIN = 0xf3
        self.ser = ser
34

35 36
    def uart_read(self):
        r = self.ser.read(1)
37 38 39 40
        if len(r) != 1:
            raise Exception("Serial read error")
        return r[0]

41
    def uart_write(self, c):
42
        b = struct.pack("B", c)
43
        r = self.ser.write(b)
44 45 46 47
        if r != len(b):
            raise Exception("Serial write error")
        return r

48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
    def send(self, buf):
        self.uart_write(UARTP.SYN)
        len_ind_0 = 0xff & len(buf)
        len_ind_1 = 0xff & (len(buf) >> 7)
        if len(buf) < 128:
            self.uart_write(len_ind_0)
        else:
            self.uart_write(len_ind_0 | 0x80)
            self.uart_write(len_ind_1)
        fcs = 0
        for i in range(len(buf)):
            info = buf[i]
            fcs = (fcs + info) & 0xff
            self.uart_write(buf[i])
        fcs = (0xff - fcs) & 0xff
        self.uart_write(fcs)
        self.uart_write(UARTP.FIN)
        eprint("sent frame '%s'" % buf.hex())

    def recv(self):
        tag_old = UARTP.FIN
69
        while 1:
70 71 72 73 74
            tag = tag_old
            while 1:
                if tag_old == UARTP.FIN:
                    if tag == UARTP.SYN:
                        break
75
                tag_old = tag
76
                tag = self.uart_read()
77 78
            tag_old = tag
            
79
            l = self.uart_read()
80 81
            if l & 0x80:
                l &= 0x7f
82
                l |= self.uart_read() << 7
83 84 85 86

            fcs = 0
            buf = []
            for i in range(l):
87
                info = self.uart_read()
88 89
                buf.append(info)
                fcs = (fcs + info) & 0xff
90
            fcs = (fcs + self.uart_read()) & 0xff
91

92
            tag = self.uart_read()
93
            if fcs == 0xff:
94
                if tag == UARTP.FIN:
95
                    buf = bytes(buf)
96
                    eprint("rcvd frame '%s'" % buf.hex())
97 98 99 100 101 102
                    if len(buf) >= 1 and buf[0] == 0xde:
                        sys.stderr.buffer.write(buf[1:])
                        sys.stderr.flush()
                    else:
                        return buf

103 104 105 106 107 108 109 110 111 112
def main(argv):
    eprint(argv[0])
    script_dir = os.path.split(argv[0])[0]
    if len(script_dir) > 0:
        os.chdir(script_dir)
    flash()

    dev = get_serial()
    ser = serial.Serial(dev, baudrate=115200, timeout=5)
    uartp = UARTP(ser)
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140

    def stdin_read(n):
        b = sys.stdin.buffer.read(n)
        if len(b) != n:
            sys.exit(1)
        return b

    def stdin_readvar():
        l = stdin_read(4)
        (l, ) = struct.unpack("<I", l)
        v = stdin_read(l)
        return v

    exp_hello = b"Hello, World!"
    hello = ser.read(len(exp_hello))
    if hello != exp_hello:
        eprint("Improper board initialization message: ")
        return 1
    eprint("Board initialized properly")
    sys.stdout.write("Hello, World!\n")
    sys.stdout.flush()

    while 1:
        action = stdin_read(1)[0]
        eprint("Command %c from stdin" % action)

        if action in b"ackmps":
            v = stdin_readvar()
141 142
            uartp.send(struct.pack("B", action) + v)
            ack = uartp.recv()
143 144 145 146 147 148
            if len(ack) != 1 or ack[0] != action:
                raise Exception("Unacknowledged variable transfer")
            eprint("Var %c successfully sent to board" % action)

        elif action in b"ACKMPS":
            c = struct.pack("B", action)
149 150
            uartp.send(c)
            v = uartp.recv()
151 152 153 154 155 156 157 158 159 160
            if len(v) < 1 or v[0] != action:
                raise Exception("Could not obtain variable from board")
            v = v[1:]
            eprint("Var %c received from board: %s" % (action, v.hex()))
            l = struct.pack("<I", len(v))
            sys.stdout.buffer.write(l + v)
            sys.stdout.flush()

        elif action in b"ed":
            c = struct.pack("B", action)
161 162
            uartp.send(c)
            ack = uartp.recv()
163 164 165 166 167 168 169 170 171
            if len(ack) != 1 or ack[0] != action:
                raise Exception("Unacknowledged variable transfer")
            eprint("Operation %c completed successfully" % action)

        else:
            raise Exception("Unknown action %c" % action)
        
    
    return 0
172 173 174 175


if __name__ == "__main__":
    sys.exit(main(sys.argv))