Files
DTU-RemtoeLCD/remo_disp_client.py

323 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
远程显示通信工具 - 与 RemoDispBus 协议兼容的 Python 客户端
协议说明(与 HMI/RemoeDisp/RemoDispBus.c 一致):
- 工具 → 装置: [0xAA][功能码][长度高][长度低][数据...][CRC]
- 装置 → 工具: [0xBB][功能码][长度高][长度低][数据...][CRC]
- CRC: 数据区异或校验(不含 Tag、功能码、长度
- 端口: 7003
"""
import socket
import struct
import threading
import time
import argparse
from typing import Optional, Tuple, Callable
# 协议常量(与 RemoDispBus.h 一致)
TAG_CLIENT = 0xAA # 工具侧发送
TAG_DEVICE = 0xBB # 装置侧发送
PORT = 7003
# 功能码
CMD_KEEPLIVE = 0
CMD_INIT = 1
CMD_KEY = 2
CMD_LCDMEM = 3
CMD_ACK = 4
CMD_NCK = 5
CMD_IDLE = 0xFF
# 按键值(与 RemoDispBus.h 一致)
KEY_U = 0x02 # 上
KEY_D = 0x40 # 下
KEY_L = 0x10 # 左
KEY_R = 0x08 # 右
KEY_ENT = 0x20 # 确认
KEY_ESC = 0x01 # 取消
KEY_F1 = 0x04
KEY_F2 = 0x80
KEY_NONE = 0
def calc_crc(data: bytes) -> int:
"""异或校验,与 RDisp_CalCrc 一致"""
crc = 0
for b in data:
crc ^= b
return crc & 0xFF
def build_frame(cmd: int, data: bytes) -> bytes:
"""组帧:工具→装置 [0xAA][cmd][len_hi][len_lo][data][crc]"""
length = len(data)
header = bytes([TAG_CLIENT, cmd & 0xFF, (length >> 8) & 0xFF, length & 0xFF])
crc = calc_crc(data)
return header + data + bytes([crc])
def parse_frame(raw: bytes) -> Optional[Tuple[int, bytes]]:
"""解析装置回复:[0xBB][cmd][len_hi][len_lo][data][crc]"""
if len(raw) < 6:
return None
if raw[0] != TAG_DEVICE:
return None
cmd = raw[1]
length = (raw[2] << 8) | raw[3]
if len(raw) < 5 + length + 1:
return None
data = raw[4:4 + length]
crc_recv = raw[4 + length]
if calc_crc(data) != crc_recv:
return None
return (cmd, data)
class RemoDispClient:
"""远程显示 TCP 客户端"""
def __init__(self, host: str, port: int = PORT, timeout: float = 5.0):
self.host = host
self.port = port
self.timeout = timeout
self._sock: Optional[socket.socket] = None
self._init_info: Optional[dict] = None
def connect(self) -> bool:
"""连接装置"""
try:
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.settimeout(self.timeout)
self._sock.connect((self.host, self.port))
return True
except Exception as e:
print(f"连接失败: {e}")
return False
def disconnect(self):
"""断开连接"""
if self._sock:
try:
self._sock.close()
except Exception:
pass
self._sock = None
def _send(self, cmd: int, data: bytes = b"") -> bool:
"""发送报文"""
if not self._sock:
return False
frame = build_frame(cmd, data)
try:
self._sock.sendall(frame)
return True
except Exception as e:
print(f"发送失败: {e}")
return False
def _recv(self) -> Optional[Tuple[int, bytes]]:
"""接收并解析一帧"""
if not self._sock:
return None
try:
header = self._sock.recv(5)
if len(header) < 5:
return None
length = (header[2] << 8) | header[3]
rest = self._sock.recv(length + 1)
if len(rest) < length + 1:
return None
raw = header + rest
return parse_frame(raw)
except socket.timeout:
return None
except Exception as e:
print(f"接收失败: {e}")
return None
def request_init(self) -> Optional[dict]:
"""召唤初始化,解析 LCD 尺寸、显存大小、按键映射"""
if not self._send(CMD_INIT):
return None
result = self._recv()
if not result or result[0] != CMD_INIT:
return None
data = result[1]
if len(data) < 29:
return None
# 与 RDisp_Form_InitInf 格式一致
width = (data[0] << 8) | data[1]
height = (data[2] << 8) | data[3]
lcd_type = data[4]
mem_size = (data[5] << 24) | (data[6] << 16) | (data[7] << 8) | data[8]
rgb_red = struct.unpack(">I", data[9:13])[0]
rgb_green = struct.unpack(">I", data[13:17])[0]
rgb_blue = struct.unpack(">I", data[17:21])[0]
keys = list(data[21:29])
self._init_info = {
"width": width,
"height": height,
"lcd_type": lcd_type,
"mem_size": mem_size,
"rgb_red": rgb_red,
"rgb_green": rgb_green,
"rgb_blue": rgb_blue,
"keys": keys,
}
return self._init_info
def send_key(self, key_value: int) -> bool:
"""下发按键"""
return self._send(CMD_KEY, bytes([key_value & 0xFF]))
def request_lcdmem(self, start_pos: int = 0) -> Optional[Tuple[int, bytes]]:
"""召唤显存,从 start_pos 起。返回 (下一位置, 显存数据)"""
data = struct.pack(">I", start_pos)
if not self._send(CMD_LCDMEM, data):
return None
result = self._recv()
if not result or result[0] != CMD_LCDMEM:
return None
payload = result[1]
if len(payload) < 4:
return None
pos = struct.unpack(">I", payload[:4])[0]
mem_data = payload[4:]
return (pos + len(mem_data), mem_data)
def fetch_full_screen(self, on_progress: Optional[Callable[[int, int], None]] = None) -> Optional[bytes]:
"""拉取完整显存,支持断点续传"""
info = self._init_info or self.request_init()
if not info:
return None
mem_size = info["mem_size"]
buffer = bytearray(mem_size)
pos = 0
while pos < mem_size:
result = self.request_lcdmem(pos)
if not result:
return None
next_pos, chunk = result
buffer[pos : pos + len(chunk)] = chunk
pos = next_pos
if on_progress:
on_progress(pos, mem_size)
return bytes(buffer)
def send_keepalive(self) -> bool:
"""发送链路保持(装置端 KEEPLIVE 不回复,仅刷新计时器)"""
return self._send(CMD_KEEPLIVE)
def mono_to_image(data: bytes, width: int, height: int) -> "Optional[object]":
"""单色显存转 PIL Image1bit -> 黑白图)"""
try:
from PIL import Image
except ImportError:
return None
img = Image.new("1", (width, height))
pix = img.load()
for y in range(height):
for x in range(width):
byte_idx = (y * width + x) // 8
bit_idx = 7 - (x % 8)
if byte_idx < len(data):
pix[x, y] = 1 if (data[byte_idx] >> bit_idx) & 1 else 0
else:
pix[x, y] = 0
return img
def main():
parser = argparse.ArgumentParser(description="远程显示通信工具")
parser.add_argument("host", help="装置 IP 地址")
parser.add_argument("-p", "--port", type=int, default=PORT, help=f"端口 (默认 {PORT})")
parser.add_argument("--init", action="store_true", help="仅获取初始化信息")
parser.add_argument("--screen", action="store_true", help="拉取显存并保存为 screen.png")
parser.add_argument("--key", type=str, help="发送按键: U/D/L/R/ENT/ESC/F1/F2")
parser.add_argument("--keepalive", action="store_true", help="发送一次 KEEPLIVE")
args = parser.parse_args()
key_map = {
"U": KEY_U, "D": KEY_D, "L": KEY_L, "R": KEY_R,
"ENT": KEY_ENT, "ESC": KEY_ESC, "F1": KEY_F1, "F2": KEY_F2,
}
client = RemoDispClient(args.host, args.port)
if not client.connect():
return 1
try:
if args.init:
info = client.request_init()
if info:
print("初始化信息:")
print(f" LCD: {info['width']}x{info['height']}, 类型={info['lcd_type']}")
print(f" 显存大小: {info['mem_size']} 字节")
print(f" 按键映射: {info['keys']}")
else:
print("获取初始化失败")
return 1
elif args.key:
k = key_map.get(args.key.upper(), KEY_NONE)
if k == KEY_NONE:
print(f"未知按键: {args.key}")
return 1
if client.send_key(k):
print(f"已发送按键: {args.key}")
else:
print("发送按键失败")
return 1
elif args.keepalive:
if client.send_keepalive():
print("已发送 KEEPLIVE")
else:
print("发送失败")
return 1
elif args.screen:
def progress(cur, total):
print(f"\r拉取显存: {cur}/{total} ({100*cur//total}%)", end="")
data = client.fetch_full_screen(on_progress=progress)
if not data:
print("\n拉取显存失败")
return 1
info = client._init_info or {}
w = info.get("width", 160)
h = info.get("height", 160)
img = mono_to_image(data, w, h)
if img:
img.save("screen.png")
print(f"\n已保存 screen.png ({w}x{h})")
else:
with open("screen.raw", "wb") as f:
f.write(data)
print(f"\n已保存 screen.raw ({len(data)} 字节,需 PIL 才能转 PNG)")
else:
# 默认:获取初始化并拉取一帧显存
info = client.request_init()
if info:
print(f"LCD {info['width']}x{info['height']}, 显存 {info['mem_size']} 字节")
result = client.request_lcdmem(0)
if result:
next_pos, chunk = result
print(f"收到显存 {len(chunk)} 字节,下一位置 {next_pos}")
else:
print("召唤显存失败")
finally:
client.disconnect()
return 0
if __name__ == "__main__":
exit(main())