test 4.36 KB
Newer Older
1 2 3 4
#!/usr/bin/env python3

import os
import sys
5 6
import struct
import serial
7 8
import subprocess

9 10 11
def eprint(*args, **kargs):
    print(*args, file=sys.stderr, **kargs)

12 13 14 15 16 17 18

def flash():
    pipe = subprocess.PIPE
    p = subprocess.Popen(['platformio', 'run', '-e', 'uno', '--target', 'upload'],
            stdout=sys.stderr, stdin=pipe)
    stdout, stderr = p.communicate("") 

19 20 21 22 23 24 25
def get_serial():
    import serial.tools.list_ports
    ports = serial.tools.list_ports.comports()
    devices = [ p.device for p in ports ]
    devices.sort()
    return devices[-1]

26
def main(argv):
27
    eprint(argv[0])
28 29 30 31
    script_dir = os.path.split(argv[0])[0]
    if len(script_dir) > 0:
        os.chdir(script_dir)
    flash()
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156

    dev = get_serial()
    ser = serial.Serial(dev, baudrate=115200, timeout=5)

    def uart_read():
        r = ser.read(1)
        if len(r) != 1:
            raise Exception("Serial read error")
        return r[0]

    def uart_write(c):
        b = struct.pack("B", c)
        r = ser.write(b)
        if r != len(b):
            raise Exception("Serial write error")
        return r

    def uart_recvframe():
        while 1:
            tag_old = 0
            tag = 0xf3
            while tag_old != 0xf3 or tag != 0xf9:
                tag_old = tag
                tag = uart_read()
            tag_old = tag
            
            l = uart_read()
            if l & 0x80:
                l &= 0x7f
                l |= uart_read() << 7

            fcs = 0
            buf = []
            for i in range(l):
                info = uart_read()
                buf.append(info)
                fcs = (fcs + info) & 0xff
            fcs = (fcs + uart_read()) & 0xff

            tag = uart_read()
            if fcs == 0xff:
                if tag == 0xf3:
                    buf = bytes(buf)
                    if len(buf) >= 1 and buf[0] == 0xde:
                        sys.stderr.buffer.write(buf[1:])
                        sys.stderr.flush()
                    else:
                        return buf

    def uart_sendframe(buf):
        uart_write(0xf9)
        len_ind_0 = 0xff & len(buf)
        len_ind_1 = 0xff & (len(buf) >> 7)
        if len(buf) < 128:
            uart_write(len_ind_0)
        else:
            uart_write(len_ind_0 | 0x80)
            uart_write(len_ind_1)
        fcs = 0
        for i in range(len(buf)):
            info = buf[i]
            fcs = (fcs + info) & 0xff
            uart_write(buf[i])
        fcs = (0xff - fcs) & 0xff
        uart_write(fcs)
        uart_write(0xf3)

    def stdin_read(n):
        b = sys.stdin.buffer.read(n)
        if len(b) != n:
            sys.exit(1)
        return b

    def stdin_readvar():
        l = stdin_read(4)
        (l, ) = struct.unpack("<I", l)
        v = stdin_read(l)
        return v

    exp_hello = b"Hello, World!"
    hello = ser.read(len(exp_hello))
    if hello != exp_hello:
        eprint("Improper board initialization message: ")
        return 1
    eprint("Board initialized properly")
    sys.stdout.write("Hello, World!\n")
    sys.stdout.flush()

    while 1:
        action = stdin_read(1)[0]
        eprint("Command %c from stdin" % action)

        if action in b"ackmps":
            v = stdin_readvar()
            uart_sendframe(struct.pack("B", action) + v)
            ack = uart_recvframe()
            if len(ack) != 1 or ack[0] != action:
                raise Exception("Unacknowledged variable transfer")
            eprint("Var %c successfully sent to board" % action)

        elif action in b"ACKMPS":
            c = struct.pack("B", action)
            uart_sendframe(c)
            v = uart_recvframe()
            if len(v) < 1 or v[0] != action:
                raise Exception("Could not obtain variable from board")
            v = v[1:]
            eprint("Var %c received from board: %s" % (action, v.hex()))
            l = struct.pack("<I", len(v))
            sys.stdout.buffer.write(l + v)
            sys.stdout.flush()

        elif action in b"ed":
            c = struct.pack("B", action)
            uart_sendframe(c)
            ack = uart_recvframe()
            if len(ack) != 1 or ack[0] != action:
                raise Exception("Unacknowledged variable transfer")
            eprint("Operation %c completed successfully" % action)

        else:
            raise Exception("Unknown action %c" % action)
        
    
    return 0
157 158 159 160


if __name__ == "__main__":
    sys.exit(main(sys.argv))