更新 Readme 的内容,删除不必要的文件
This commit is contained in:
130
README.md
130
README.md
@@ -1,23 +1,49 @@
|
||||
# DTU-RemoteLCD - 远程显示通信工具
|
||||
|
||||
网络调试的远程显示 LCD 模拟软件。与 `HMI/RemoeDisp/RemoDispBus.c` 协议兼容的 Python 客户端,用于远程查看装置 LCD 显存并模拟按键。
|
||||
与 **RemoDispBus** 协议兼容的远程 LCD 显示与按键模拟工具。通过网络连接 DTU 装置,实时查看 LCD 显存画面并模拟按键操作,适用于调试、监控等场景。
|
||||
|
||||
## 协议说明
|
||||
## 功能特性
|
||||
|
||||
- **端口**: 7003
|
||||
- **报文格式**:
|
||||
- 工具 → 装置: `[0xAA][功能码][长度高][长度低][数据...][CRC]`
|
||||
- 装置 → 工具: `[0xBB][功能码][长度高][长度低][数据...][CRC]`
|
||||
- **CRC**: 数据区异或校验
|
||||
- **功能码**: KEEPLIVE=0, INIT=1, KEY=2, LCDMEM=3
|
||||
- **实时屏幕显示**:将装置 LCD 显存以 1bpp 位图形式拉取并渲染
|
||||
- **按键模拟**:支持方向键、确认、返回、复归等按键
|
||||
- **多种使用方式**:命令行、GUI、Web 浏览器
|
||||
- **WebSocket 传输**:Web 版采用 Socket.IO,服务端主动推送,低延迟
|
||||
|
||||
## 命令行工具 (remo_disp_client.py)
|
||||
## 环境要求
|
||||
|
||||
- Python 3.7+
|
||||
- 与 DTU 装置网络互通(默认端口 7003)
|
||||
|
||||
## 快速开始
|
||||
|
||||
### 1. 安装依赖
|
||||
|
||||
```bash
|
||||
# 安装可选依赖(用于保存 PNG)
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
# 获取初始化信息
|
||||
### 2. Web 界面(推荐)
|
||||
|
||||
```bash
|
||||
python remo_disp_server.py [装置IP]
|
||||
```
|
||||
|
||||
启动后访问 **http://localhost:8181**,在页面中:
|
||||
- 点击「连接」→ 输入装置 IP 和端口(默认 7003)→ 连接
|
||||
- 左侧显示装置 LCD 画面(约 100ms 刷新)
|
||||
- 右侧使用方向键、确认、复归、返回进行按键操作
|
||||
- 点击「断开」断开与装置的连接
|
||||
|
||||
**可选预连接**:启动时传入 IP 可预连接装置,例如:
|
||||
|
||||
```bash
|
||||
python remo_disp_server.py 192.168.253.3
|
||||
```
|
||||
|
||||
### 3. 命令行工具
|
||||
|
||||
```bash
|
||||
# 获取初始化信息(屏幕宽高、显存大小)
|
||||
python remo_disp_client.py 192.168.1.100 --init
|
||||
|
||||
# 拉取显存并保存为 screen.png
|
||||
@@ -28,9 +54,12 @@ python remo_disp_client.py 192.168.1.100 --key ENT
|
||||
|
||||
# 发送 KEEPLIVE
|
||||
python remo_disp_client.py 192.168.1.100 --keepalive
|
||||
|
||||
# 指定端口
|
||||
python remo_disp_client.py 192.168.1.100 -p 7003 --screen
|
||||
```
|
||||
|
||||
## GUI 查看器 (remo_disp_viewer.py)
|
||||
### 4. GUI 查看器
|
||||
|
||||
```bash
|
||||
python remo_disp_viewer.py 192.168.1.100
|
||||
@@ -40,27 +69,70 @@ python remo_disp_viewer.py 192.168.1.100
|
||||
- 点击按键模拟远程按键
|
||||
- 点击「刷新」重新拉取显存
|
||||
|
||||
## Web 界面 (remo_disp_server.py + remo_disp_ui.html)
|
||||
## 项目结构
|
||||
|
||||
```bash
|
||||
pip install flask
|
||||
python remo_disp_server.py
|
||||
```
|
||||
DTU-RemoteLCD/
|
||||
├── remo_disp_server.py # Web 服务(Flask + Socket.IO)
|
||||
├── remo_disp_ui.html # Web 前端页面
|
||||
├── remo_disp_client.py # 命令行客户端
|
||||
├── remo_disp_viewer.py # Tkinter GUI 查看器
|
||||
├── requirements.txt # Python 依赖
|
||||
├── static/ # 静态资源(可选)
|
||||
│ └── favicon.ico # 网站图标
|
||||
└── README.md
|
||||
```
|
||||
|
||||
启动后访问 http://localhost:8080 ,界面包含:
|
||||
- 顶部菜单:连接、设置、退出、关于
|
||||
- 左侧显示区:装置 LCD 画面(约 500ms 刷新)
|
||||
- 右侧控制:方向键 + 确认、复归、返回
|
||||
## 协议说明(RemoDispBus)
|
||||
|
||||
与 `HMI/RemoeDisp/RemoDispBus.c` 兼容:
|
||||
|
||||
| 项目 | 说明 |
|
||||
|------|------|
|
||||
| **端口** | 7003 |
|
||||
| **工具 → 装置** | `[0xAA][功能码][长度高][长度低][数据...][CRC]` |
|
||||
| **装置 → 工具** | `[0xBB][功能码][长度高][长度低][数据...][CRC]` |
|
||||
| **CRC** | 数据区逐字节异或,取低 8 位 |
|
||||
|
||||
**功能码**:
|
||||
|
||||
| 码 | 名称 | 说明 |
|
||||
|----|------|------|
|
||||
| 0 | KEEPLIVE | 保活 |
|
||||
| 1 | INIT | 初始化,获取屏幕宽高、显存大小 |
|
||||
| 2 | KEY | 按键 |
|
||||
| 3 | LCDMEM | 读取显存(屏幕画面) |
|
||||
|
||||
## 按键映射
|
||||
|
||||
| 键名 | 值 |
|
||||
| 键名 | 协议码 | 说明 |
|
||||
|------|--------|------|
|
||||
| U | 0x02 | 上 |
|
||||
| D | 0x40 | 下 |
|
||||
| L | 0x10 | 左 |
|
||||
| R | 0x08 | 右 |
|
||||
| ENT | 0x20 | 确认 |
|
||||
| ESC | 0x01 | 返回/取消 |
|
||||
| F1/RESET | 0x04 | 复归 |
|
||||
| F2 | 0x80 | F2 |
|
||||
|
||||
## 依赖说明
|
||||
|
||||
| 依赖 | 用途 |
|
||||
|------|------|
|
||||
| 上 | 0x02 |
|
||||
| 下 | 0x40 |
|
||||
| 左 | 0x10 |
|
||||
| 右 | 0x08 |
|
||||
| 确认 | 0x20 |
|
||||
| 取消 | 0x01 |
|
||||
| F1 | 0x04 |
|
||||
| F2 | 0x80 |
|
||||
| Flask | Web 框架 |
|
||||
| flask-socketio | WebSocket 支持 |
|
||||
| python-socketio | Socket.IO 服务端 |
|
||||
| Pillow | 1bpp 位图转 PNG |
|
||||
|
||||
`remo_disp_client.py` 仅需 Python 标准库;`remo_disp_viewer.py` 使用 Tkinter(Python 自带)。
|
||||
|
||||
## 配置说明
|
||||
|
||||
- **Web 服务端口**:默认 8181,在 `remo_disp_server.py` 的 `main()` 中修改
|
||||
- **装置端口**:默认 7003,连接时可输入自定义端口
|
||||
- **Favicon**:将 `favicon.ico` 放入 `static/` 目录即可显示网站图标
|
||||
|
||||
## 许可证
|
||||
|
||||
本项目仅供内部调试使用。
|
||||
|
||||
@@ -1,322 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
远程显示通信工具 - 与 RemoDispBus 协议兼容的 Python 客户端
|
||||
|
||||
协议说明(与 HMI/RemoeDisp/RemoDispBus.c 一致):
|
||||
- 工具 → 装置: [0xAA][功能码][长度高][长度低][数据...][CRC]
|
||||
- 装置 → 工具: [0xBB][功能码][长度高][长度低][数据...][CRC]
|
||||
- CRC: 数据区异或校验(不含 Tag、功能码、长度)
|
||||
- 端口: 7003
|
||||
"""
|
||||
|
||||
import socket
|
||||
import struct
|
||||
import threading
|
||||
import time
|
||||
import argparse
|
||||
from typing import Optional, Tuple, Callable
|
||||
|
||||
# 协议常量(与 RemoDispBus.h 一致)
|
||||
TAG_CLIENT = 0xAA # 工具侧发送
|
||||
TAG_DEVICE = 0xBB # 装置侧发送
|
||||
PORT = 7003
|
||||
|
||||
# 功能码
|
||||
CMD_KEEPLIVE = 0
|
||||
CMD_INIT = 1
|
||||
CMD_KEY = 2
|
||||
CMD_LCDMEM = 3
|
||||
CMD_ACK = 4
|
||||
CMD_NCK = 5
|
||||
CMD_IDLE = 0xFF
|
||||
|
||||
# 按键值(与 RemoDispBus.h 一致)
|
||||
KEY_U = 0x02 # 上
|
||||
KEY_D = 0x40 # 下
|
||||
KEY_L = 0x10 # 左
|
||||
KEY_R = 0x08 # 右
|
||||
KEY_ENT = 0x20 # 确认
|
||||
KEY_ESC = 0x01 # 取消
|
||||
KEY_F1 = 0x04
|
||||
KEY_F2 = 0x80
|
||||
KEY_NONE = 0
|
||||
|
||||
|
||||
def calc_crc(data: bytes) -> int:
|
||||
"""异或校验,与 RDisp_CalCrc 一致"""
|
||||
crc = 0
|
||||
for b in data:
|
||||
crc ^= b
|
||||
return crc & 0xFF
|
||||
|
||||
|
||||
def build_frame(cmd: int, data: bytes) -> bytes:
|
||||
"""组帧:工具→装置 [0xAA][cmd][len_hi][len_lo][data][crc]"""
|
||||
length = len(data)
|
||||
header = bytes([TAG_CLIENT, cmd & 0xFF, (length >> 8) & 0xFF, length & 0xFF])
|
||||
crc = calc_crc(data)
|
||||
return header + data + bytes([crc])
|
||||
|
||||
|
||||
def parse_frame(raw: bytes) -> Optional[Tuple[int, bytes]]:
|
||||
"""解析装置回复:[0xBB][cmd][len_hi][len_lo][data][crc]"""
|
||||
if len(raw) < 6:
|
||||
return None
|
||||
if raw[0] != TAG_DEVICE:
|
||||
return None
|
||||
cmd = raw[1]
|
||||
length = (raw[2] << 8) | raw[3]
|
||||
if len(raw) < 5 + length + 1:
|
||||
return None
|
||||
data = raw[4:4 + length]
|
||||
crc_recv = raw[4 + length]
|
||||
if calc_crc(data) != crc_recv:
|
||||
return None
|
||||
return (cmd, data)
|
||||
|
||||
|
||||
class RemoDispClient:
|
||||
"""远程显示 TCP 客户端"""
|
||||
|
||||
def __init__(self, host: str, port: int = PORT, timeout: float = 5.0):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.timeout = timeout
|
||||
self._sock: Optional[socket.socket] = None
|
||||
self._init_info: Optional[dict] = None
|
||||
|
||||
def connect(self) -> bool:
|
||||
"""连接装置"""
|
||||
try:
|
||||
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self._sock.settimeout(self.timeout)
|
||||
self._sock.connect((self.host, self.port))
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"连接失败: {e}")
|
||||
return False
|
||||
|
||||
def disconnect(self):
|
||||
"""断开连接"""
|
||||
if self._sock:
|
||||
try:
|
||||
self._sock.close()
|
||||
except Exception:
|
||||
pass
|
||||
self._sock = None
|
||||
|
||||
def _send(self, cmd: int, data: bytes = b"") -> bool:
|
||||
"""发送报文"""
|
||||
if not self._sock:
|
||||
return False
|
||||
frame = build_frame(cmd, data)
|
||||
try:
|
||||
self._sock.sendall(frame)
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"发送失败: {e}")
|
||||
return False
|
||||
|
||||
def _recv(self) -> Optional[Tuple[int, bytes]]:
|
||||
"""接收并解析一帧"""
|
||||
if not self._sock:
|
||||
return None
|
||||
try:
|
||||
header = self._sock.recv(5)
|
||||
if len(header) < 5:
|
||||
return None
|
||||
length = (header[2] << 8) | header[3]
|
||||
rest = self._sock.recv(length + 1)
|
||||
if len(rest) < length + 1:
|
||||
return None
|
||||
raw = header + rest
|
||||
return parse_frame(raw)
|
||||
except socket.timeout:
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"接收失败: {e}")
|
||||
return None
|
||||
|
||||
def request_init(self) -> Optional[dict]:
|
||||
"""召唤初始化,解析 LCD 尺寸、显存大小、按键映射"""
|
||||
if not self._send(CMD_INIT):
|
||||
return None
|
||||
result = self._recv()
|
||||
if not result or result[0] != CMD_INIT:
|
||||
return None
|
||||
data = result[1]
|
||||
if len(data) < 29:
|
||||
return None
|
||||
# 与 RDisp_Form_InitInf 格式一致
|
||||
width = (data[0] << 8) | data[1]
|
||||
height = (data[2] << 8) | data[3]
|
||||
lcd_type = data[4]
|
||||
mem_size = (data[5] << 24) | (data[6] << 16) | (data[7] << 8) | data[8]
|
||||
rgb_red = struct.unpack(">I", data[9:13])[0]
|
||||
rgb_green = struct.unpack(">I", data[13:17])[0]
|
||||
rgb_blue = struct.unpack(">I", data[17:21])[0]
|
||||
keys = list(data[21:29])
|
||||
self._init_info = {
|
||||
"width": width,
|
||||
"height": height,
|
||||
"lcd_type": lcd_type,
|
||||
"mem_size": mem_size,
|
||||
"rgb_red": rgb_red,
|
||||
"rgb_green": rgb_green,
|
||||
"rgb_blue": rgb_blue,
|
||||
"keys": keys,
|
||||
}
|
||||
return self._init_info
|
||||
|
||||
def send_key(self, key_value: int) -> bool:
|
||||
"""下发按键"""
|
||||
return self._send(CMD_KEY, bytes([key_value & 0xFF]))
|
||||
|
||||
def request_lcdmem(self, start_pos: int = 0) -> Optional[Tuple[int, bytes]]:
|
||||
"""召唤显存,从 start_pos 起。返回 (下一位置, 显存数据)"""
|
||||
data = struct.pack(">I", start_pos)
|
||||
if not self._send(CMD_LCDMEM, data):
|
||||
return None
|
||||
result = self._recv()
|
||||
if not result or result[0] != CMD_LCDMEM:
|
||||
return None
|
||||
payload = result[1]
|
||||
if len(payload) < 4:
|
||||
return None
|
||||
pos = struct.unpack(">I", payload[:4])[0]
|
||||
mem_data = payload[4:]
|
||||
return (pos + len(mem_data), mem_data)
|
||||
|
||||
def fetch_full_screen(self, on_progress: Optional[Callable[[int, int], None]] = None) -> Optional[bytes]:
|
||||
"""拉取完整显存,支持断点续传"""
|
||||
info = self._init_info or self.request_init()
|
||||
if not info:
|
||||
return None
|
||||
mem_size = info["mem_size"]
|
||||
buffer = bytearray(mem_size)
|
||||
pos = 0
|
||||
while pos < mem_size:
|
||||
result = self.request_lcdmem(pos)
|
||||
if not result:
|
||||
return None
|
||||
next_pos, chunk = result
|
||||
buffer[pos : pos + len(chunk)] = chunk
|
||||
pos = next_pos
|
||||
if on_progress:
|
||||
on_progress(pos, mem_size)
|
||||
return bytes(buffer)
|
||||
|
||||
def send_keepalive(self) -> bool:
|
||||
"""发送链路保持(装置端 KEEPLIVE 不回复,仅刷新计时器)"""
|
||||
return self._send(CMD_KEEPLIVE)
|
||||
|
||||
|
||||
def mono_to_image(data: bytes, width: int, height: int) -> "Optional[object]":
|
||||
"""单色显存转 PIL Image(1bit -> 黑白图)"""
|
||||
try:
|
||||
from PIL import Image
|
||||
except ImportError:
|
||||
return None
|
||||
img = Image.new("1", (width, height))
|
||||
pix = img.load()
|
||||
for y in range(height):
|
||||
for x in range(width):
|
||||
byte_idx = (y * width + x) // 8
|
||||
bit_idx = 7 - (x % 8)
|
||||
if byte_idx < len(data):
|
||||
pix[x, y] = 1 if (data[byte_idx] >> bit_idx) & 1 else 0
|
||||
else:
|
||||
pix[x, y] = 0
|
||||
return img
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="远程显示通信工具")
|
||||
parser.add_argument("host", help="装置 IP 地址")
|
||||
parser.add_argument("-p", "--port", type=int, default=PORT, help=f"端口 (默认 {PORT})")
|
||||
parser.add_argument("--init", action="store_true", help="仅获取初始化信息")
|
||||
parser.add_argument("--screen", action="store_true", help="拉取显存并保存为 screen.png")
|
||||
parser.add_argument("--key", type=str, help="发送按键: U/D/L/R/ENT/ESC/F1/F2")
|
||||
parser.add_argument("--keepalive", action="store_true", help="发送一次 KEEPLIVE")
|
||||
args = parser.parse_args()
|
||||
|
||||
key_map = {
|
||||
"U": KEY_U, "D": KEY_D, "L": KEY_L, "R": KEY_R,
|
||||
"ENT": KEY_ENT, "ESC": KEY_ESC, "F1": KEY_F1, "F2": KEY_F2,
|
||||
}
|
||||
|
||||
client = RemoDispClient(args.host, args.port)
|
||||
if not client.connect():
|
||||
return 1
|
||||
|
||||
try:
|
||||
if args.init:
|
||||
info = client.request_init()
|
||||
if info:
|
||||
print("初始化信息:")
|
||||
print(f" LCD: {info['width']}x{info['height']}, 类型={info['lcd_type']}")
|
||||
print(f" 显存大小: {info['mem_size']} 字节")
|
||||
print(f" 按键映射: {info['keys']}")
|
||||
else:
|
||||
print("获取初始化失败")
|
||||
return 1
|
||||
|
||||
elif args.key:
|
||||
k = key_map.get(args.key.upper(), KEY_NONE)
|
||||
if k == KEY_NONE:
|
||||
print(f"未知按键: {args.key}")
|
||||
return 1
|
||||
if client.send_key(k):
|
||||
print(f"已发送按键: {args.key}")
|
||||
else:
|
||||
print("发送按键失败")
|
||||
return 1
|
||||
|
||||
elif args.keepalive:
|
||||
if client.send_keepalive():
|
||||
print("已发送 KEEPLIVE")
|
||||
else:
|
||||
print("发送失败")
|
||||
return 1
|
||||
|
||||
elif args.screen:
|
||||
def progress(cur, total):
|
||||
print(f"\r拉取显存: {cur}/{total} ({100*cur//total}%)", end="")
|
||||
|
||||
data = client.fetch_full_screen(on_progress=progress)
|
||||
if not data:
|
||||
print("\n拉取显存失败")
|
||||
return 1
|
||||
info = client._init_info or {}
|
||||
w = info.get("width", 160)
|
||||
h = info.get("height", 160)
|
||||
img = mono_to_image(data, w, h)
|
||||
if img:
|
||||
img.save("screen.png")
|
||||
print(f"\n已保存 screen.png ({w}x{h})")
|
||||
else:
|
||||
with open("screen.raw", "wb") as f:
|
||||
f.write(data)
|
||||
print(f"\n已保存 screen.raw ({len(data)} 字节,需 PIL 才能转 PNG)")
|
||||
|
||||
else:
|
||||
# 默认:获取初始化并拉取一帧显存
|
||||
info = client.request_init()
|
||||
if info:
|
||||
print(f"LCD {info['width']}x{info['height']}, 显存 {info['mem_size']} 字节")
|
||||
result = client.request_lcdmem(0)
|
||||
if result:
|
||||
next_pos, chunk = result
|
||||
print(f"收到显存 {len(chunk)} 字节,下一位置 {next_pos}")
|
||||
else:
|
||||
print("召唤显存失败")
|
||||
|
||||
finally:
|
||||
client.disconnect()
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
exit(main())
|
||||
@@ -1,272 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
远程显示查看器 - 带 GUI 的实时显存查看与按键模拟
|
||||
|
||||
用法: python remo_disp_viewer.py <装置IP>
|
||||
"""
|
||||
|
||||
import socket
|
||||
import struct
|
||||
import threading
|
||||
import time
|
||||
import sys
|
||||
import tkinter as tk
|
||||
from tkinter import ttk, messagebox
|
||||
from typing import Optional, Tuple
|
||||
|
||||
# 协议常量
|
||||
TAG_CLIENT = 0xAA
|
||||
TAG_DEVICE = 0xBB
|
||||
PORT = 7003
|
||||
CMD_KEEPLIVE = 0
|
||||
CMD_INIT = 1
|
||||
CMD_KEY = 2
|
||||
CMD_LCDMEM = 3
|
||||
|
||||
KEY_U, KEY_D, KEY_L, KEY_R = 0x02, 0x40, 0x10, 0x08
|
||||
KEY_ENT, KEY_ESC, KEY_F1, KEY_F2 = 0x20, 0x01, 0x04, 0x80
|
||||
|
||||
|
||||
def calc_crc(data: bytes) -> int:
|
||||
crc = 0
|
||||
for b in data:
|
||||
crc ^= b
|
||||
return crc & 0xFF
|
||||
|
||||
|
||||
def build_frame(cmd: int, data: bytes) -> bytes:
|
||||
length = len(data)
|
||||
header = bytes([TAG_CLIENT, cmd & 0xFF, (length >> 8) & 0xFF, length & 0xFF])
|
||||
return header + data + bytes([calc_crc(data)])
|
||||
|
||||
|
||||
def parse_frame(raw: bytes) -> Optional[Tuple[int, bytes]]:
|
||||
if len(raw) < 6 or raw[0] != TAG_DEVICE:
|
||||
return None
|
||||
length = (raw[2] << 8) | raw[3]
|
||||
if len(raw) < 5 + length + 1:
|
||||
return None
|
||||
data = raw[4:4 + length]
|
||||
if calc_crc(data) != raw[4 + length]:
|
||||
return None
|
||||
return (raw[1], data)
|
||||
|
||||
|
||||
class RemoDispViewer:
|
||||
def __init__(self, host: str, port: int = PORT):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self._sock: Optional[socket.socket] = None
|
||||
self._init_info: Optional[dict] = None
|
||||
self._running = False
|
||||
self._refresh_thread: Optional[threading.Thread] = None
|
||||
|
||||
def connect(self) -> bool:
|
||||
try:
|
||||
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self._sock.settimeout(3.0)
|
||||
self._sock.connect((self.host, self.port))
|
||||
self._sock.settimeout(0.5)
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def disconnect(self):
|
||||
self._running = False
|
||||
if self._sock:
|
||||
try:
|
||||
self._sock.close()
|
||||
except Exception:
|
||||
pass
|
||||
self._sock = None
|
||||
|
||||
def _send(self, cmd: int, data: bytes = b"") -> bool:
|
||||
if not self._sock:
|
||||
return False
|
||||
try:
|
||||
self._sock.sendall(build_frame(cmd, data))
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def _recv(self) -> Optional[Tuple[int, bytes]]:
|
||||
if not self._sock:
|
||||
return None
|
||||
try:
|
||||
header = self._sock.recv(5)
|
||||
if len(header) < 5:
|
||||
return None
|
||||
length = (header[2] << 8) | header[3]
|
||||
rest = self._sock.recv(length + 1)
|
||||
if len(rest) < length + 1:
|
||||
return None
|
||||
return parse_frame(header + rest)
|
||||
except (socket.timeout, BlockingIOError):
|
||||
return None
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def request_init(self) -> Optional[dict]:
|
||||
if not self._send(CMD_INIT):
|
||||
return None
|
||||
for _ in range(20):
|
||||
result = self._recv()
|
||||
if result and result[0] == CMD_INIT:
|
||||
data = result[1]
|
||||
if len(data) >= 29:
|
||||
self._init_info = {
|
||||
"width": (data[0] << 8) | data[1],
|
||||
"height": (data[2] << 8) | data[3],
|
||||
"mem_size": (data[5] << 24) | (data[6] << 16) | (data[7] << 8) | data[8],
|
||||
}
|
||||
return self._init_info
|
||||
time.sleep(0.05)
|
||||
return None
|
||||
|
||||
def send_key(self, key: int) -> bool:
|
||||
return self._send(CMD_KEY, bytes([key]))
|
||||
|
||||
def fetch_screen(self) -> Optional[bytes]:
|
||||
info = self._init_info
|
||||
if not info:
|
||||
info = self.request_init()
|
||||
if not info:
|
||||
return None
|
||||
mem_size = info["mem_size"]
|
||||
buffer = bytearray(mem_size)
|
||||
pos = 0
|
||||
while pos < mem_size:
|
||||
if not self._send(CMD_LCDMEM, struct.pack(">I", pos)):
|
||||
return None
|
||||
for _ in range(30):
|
||||
result = self._recv()
|
||||
if result and result[0] == CMD_LCDMEM:
|
||||
payload = result[1]
|
||||
if len(payload) >= 4:
|
||||
start = struct.unpack(">I", payload[:4])[0]
|
||||
chunk = payload[4:]
|
||||
buffer[start : start + len(chunk)] = chunk
|
||||
pos = start + len(chunk)
|
||||
break
|
||||
time.sleep(0.02)
|
||||
else:
|
||||
return None
|
||||
return bytes(buffer)
|
||||
|
||||
|
||||
class ViewerApp:
|
||||
def __init__(self, host: str):
|
||||
self.host = host
|
||||
self.client = RemoDispViewer(host)
|
||||
self.root = tk.Tk()
|
||||
self.root.title(f"远程显示 - {host}")
|
||||
self.root.geometry("400x320")
|
||||
self._canvas = None
|
||||
self._photo = None
|
||||
self._scale = 2
|
||||
self._build_ui()
|
||||
|
||||
def _build_ui(self):
|
||||
# 连接区
|
||||
frm = ttk.Frame(self.root, padding=5)
|
||||
frm.pack(fill=tk.X)
|
||||
ttk.Label(frm, text=f"目标: {self.host}:7003").pack(side=tk.LEFT)
|
||||
self._status = ttk.Label(frm, text="未连接")
|
||||
self._status.pack(side=tk.RIGHT)
|
||||
|
||||
# 画布
|
||||
self._canvas = tk.Canvas(self.root, width=320, height=320, bg="black")
|
||||
self._canvas.pack(padx=5, pady=5)
|
||||
|
||||
# 按键区
|
||||
btn_frm = ttk.Frame(self.root, padding=5)
|
||||
btn_frm.pack(fill=tk.X)
|
||||
keys = [
|
||||
("↑", KEY_U), ("↓", KEY_D), ("←", KEY_L), ("→", KEY_R),
|
||||
("确认", KEY_ENT), ("取消", KEY_ESC), ("F1", KEY_F1), ("F2", KEY_F2),
|
||||
]
|
||||
for label, key in keys:
|
||||
ttk.Button(btn_frm, text=label, width=6,
|
||||
command=lambda k=key: self._on_key(k)).pack(side=tk.LEFT, padx=2)
|
||||
|
||||
# 刷新
|
||||
ttk.Button(btn_frm, text="刷新", command=self._refresh).pack(side=tk.LEFT, padx=2)
|
||||
|
||||
self.root.protocol("WM_DELETE_WINDOW", self._on_close)
|
||||
|
||||
def _connect(self) -> bool:
|
||||
if self.client.connect():
|
||||
if self.client.request_init():
|
||||
self._status.config(text="已连接")
|
||||
return True
|
||||
self.client.disconnect()
|
||||
self._status.config(text="连接失败")
|
||||
return False
|
||||
|
||||
def _on_key(self, key: int):
|
||||
if not self.client._sock:
|
||||
if not self._connect():
|
||||
messagebox.showerror("错误", "请先连接装置")
|
||||
return
|
||||
self.client.send_key(key)
|
||||
|
||||
def _refresh(self):
|
||||
if not self.client._sock:
|
||||
if not self._connect():
|
||||
messagebox.showerror("错误", "连接失败")
|
||||
return
|
||||
data = self.client.fetch_screen()
|
||||
if not data:
|
||||
messagebox.showerror("错误", "拉取显存失败")
|
||||
return
|
||||
info = self.client._init_info
|
||||
w, h = info["width"], info["height"]
|
||||
# 单色转图像
|
||||
try:
|
||||
from PIL import Image, ImageTk
|
||||
img = Image.new("1", (w, h))
|
||||
pix = img.load()
|
||||
for y in range(h):
|
||||
for x in range(w):
|
||||
bi = (y * w + x) // 8
|
||||
bit = 7 - (x % 8)
|
||||
pix[x, y] = 255 if (bi < len(data) and (data[bi] >> bit) & 1) else 0
|
||||
img = img.resize((w * self._scale, h * self._scale), Image.NEAREST)
|
||||
self._photo = ImageTk.PhotoImage(img)
|
||||
self._canvas.delete("all")
|
||||
self._canvas.create_image(0, 0, anchor=tk.NW, image=self._photo)
|
||||
except ImportError:
|
||||
# 无 PIL:用简单矩形模拟
|
||||
self._canvas.delete("all")
|
||||
step = max(1, 320 // w)
|
||||
for y in range(min(h, 160)):
|
||||
for x in range(min(w, 160)):
|
||||
bi = (y * w + x) // 8
|
||||
bit = 7 - (x % 8)
|
||||
if bi < len(data) and (data[bi] >> bit) & 1:
|
||||
self._canvas.create_rectangle(
|
||||
x * step, y * step, (x + 1) * step, (y + 1) * step,
|
||||
fill="white", outline=""
|
||||
)
|
||||
|
||||
def _on_close(self):
|
||||
self.client.disconnect()
|
||||
self.root.destroy()
|
||||
|
||||
def run(self):
|
||||
self._connect()
|
||||
self._refresh()
|
||||
self.root.mainloop()
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 2:
|
||||
print("用法: python remo_disp_viewer.py <装置IP>")
|
||||
sys.exit(1)
|
||||
app = ViewerApp(sys.argv[1])
|
||||
app.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user