
掌握屏幕解析:使用 GPT-4o Vision 和 Omniparser 锁定 macOS 的逐步指南
- Rifx.Online
- Machine Learning , Deep Learning , AI Applications
- 08 Mar, 2025
使用 GPT-4o 和 OmniParser 构建屏幕解析代理
在本教程中,我们将探讨使用 GPT-4o 和 OmniParser 构建屏幕解析代理。我们的目标是演示使用此代理锁定计算机,特别是 MacOS 设备。
以下是该过程的逐步分解:
- 捕获屏幕截图:使用 PyAutoGUI 截取当前屏幕状态的屏幕截图。
- 解析屏幕截图:将屏幕截图发送到 OmniParser 进行分析。
- 任务请求:使用 LLM GPT-4o Vision 执行特定任务,例如锁定计算机。
- 评估任务完成情况:从 LLM GPT-4o 接收结果并确定任务是否完成。
- 条件操作:
- 如果任务未完成,使用 PyAutoGUI 将光标移动到特定的屏幕坐标并模拟点击。
- 如果任务已完成,则终止流程。
- 迭代:如果任务未完成,则从步骤 1 重复该过程。
流程图
解析屏幕截图:OmniParser 集成实现
为了缓解多次迭代期间遇到的高内存使用问题,我部署了 FastAPI 来托管解析代码。如果您有其他解决方案或建议,请随时分享!
以下是负责在步骤 2 — 解析屏幕截图中使用 OmniParser 进行解析的 Python 代码。
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.responses import JSONResponse
from PIL import Image
import base64
import uuid
import os
import asyncio
import torch
import gc
from transformers import AutoProcessor, AutoModelForCausalLM
from transformers.dynamic_module_utils import get_imports
from ultralytics import YOLO
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain import hub
from utils import (
check_ocr_box,
get_yolo_model,
get_caption_model_processor,
get_som_labeled_img,
)
from unittest.mock import patch
app = FastAPI()
def fixed_get_imports(filename: str | os.PathLike) -> list[str]:
print(filename)
if not str(filename).endswith("modeling_florence2.py"):
return get_imports(filename)
imports = get_imports(filename)
if "flash_attn" in imports:
imports.remove("flash_attn")
return imports
model_path = "microsoft/Florence-2-base"
with patch("transformers.dynamic_module_utils.get_imports", fixed_get_imports):
model = AutoModelForCausalLM.from_pretrained("weights/icon_caption_florence", torch_dtype=torch.float16, trust_remote_code=True).to("mps")
processor = AutoProcessor.from_pretrained("microsoft/Florence-2-base", trust_remote_code=True)
caption_model_processor = {'processor': processor, 'model': model}
yolo_model = YOLO('weights/icon_detect/best.pt').to('mps')
BOX_TRESHOLD = 0.05
@app.post("/process-image")
async def process_image(file: UploadFile = File(...)):
"""
Process an uploaded image and return OCR, labeled image, and parsed content.
"""
try:
contents = await file.read()
temp_filename = f"{uuid.uuid4()}.jpg"
temp_path = temp_filename
with open(temp_path, "wb") as temp_file:
temp_file.write(contents)
image = Image.open(temp_path)
image_rgb = image.convert('RGB')
box_overlay_ratio = image.size[0] / 3200
draw_bbox_config = {
'text_scale': 0.8 * box_overlay_ratio,
'text_thickness': max(int(2 * box_overlay_ratio), 1),
'text_padding': max(int(3 * box_overlay_ratio), 1),
'thickness': max(int(3 * box_overlay_ratio), 1),
}
ocr_bbox_rslt, is_goal_filtered = check_ocr_box(temp_path, display_img = False, output_bb_format='xyxy', goal_filtering=None, easyocr_args={'paragraph': False, 'text_threshold':0.9}, use_paddleocr=True)
text, ocr_bbox = ocr_bbox_rslt
dino_labeled_img, label_coordinates, parsed_content_list = get_som_labeled_img(temp_path, yolo_model, BOX_TRESHOLD = BOX_TRESHOLD, output_coord_in_ratio=True, ocr_bbox=ocr_bbox,draw_bbox_config=draw_bbox_config, caption_model_processor=caption_model_processor, ocr_text=text,iou_threshold=0.1)
filename = f"{uuid.uuid4()}.jpg"
with open(filename, 'wb') as f:
f.write(base64.b64decode(dino_labeled_img))
print(f"Image saved as {filename}")
gc.collect()
return {
"filename": filename,
"label_coordinates": label_coordinates,
"parsed_content": parsed_content_list
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error processing image: {str(e)}")
要运行代码,请使用以下命令:
uvicorn fastop:app
通过 Curl 调用 API
您可以使用 Curl 命令调用此 API,将 UI 屏幕截图作为图像文件传递。
curl -X POST "http://127.0.0.1:8000/process-image" \
-H "Content-Type: multipart/form-data" \
-F "file=@imgs/optimized_screenshot.jpg"
API 调用将返回一个 JSON 响应,其中包含以下键值对:
- filename:要发送到 LLM 的图像的随机生成的文件名。
'filename': '438ce778-a516-4d28-87b5-80a8bd618f66.jpg'
- label_coordinates:检测到的标签的屏幕坐标。
'label_coordinates': {
'0': [
0.02806712521447076,
0.006714412754096405,
0.04079861111111111,
0.022829006266786033
],
'1': [
0.07291666666666667,
0.008057296329453895,
0.022858796296296297,
0.018352730528200537
],
'2': [
0.10011574074074074,
0.008057296329453895,
0.019675925925925927,
0.018352730528200537
],
...
}
- parsed_content:从图像中提取的已解析内容/文本的列表。
'parsed_content': [
'Text Box ID 0: Terminal',
'Text Box ID 1: Shell',
'Text Box ID 2: Edit',
截取屏幕截图、任务请求、评估任务完成情况、条件操作和迭代
现在,我们将创建 Python 代码来执行剩余的流程:
- 截取屏幕截图
- 任务请求
- 评估任务完成情况
- 条件操作和
- 迭代
process_task() 函数是编排以下步骤的主要函数:
- 截取屏幕截图。
- 将屏幕截图发送到 OmniParser 进行分析。
- 使用此任务询问 LLM:
按照以下步骤在 Mac 上找到锁定计算机的图标:
- 在菜单栏上找到“Apple”图标,然后 2. 找到“锁定屏幕”
- 这是完整的提示:
您的任务是 {question}。您应该对哪个边界框标签进行操作?
仅在您完成任务所需的步骤后,将结果设置为“OK”。如果任务未完成或部分完成,请将结果设置为“NA”。
以 JSON 格式提供您的答案。示例:{{ “BoundingBoxID”: 12, “Description”: “description”, “Result”: “result” }}
- 如果任务已完成(例如,结果等于 OK),则结束循环,否则进入下一次迭代。
任务完成时的示例响应:
{
"BoundingBoxID": 21,
"Description": "Lock Screen",
"Result": "OK"
}
任务尚未完成时的示例响应:
{
"BoundingBoxID": 55,
"Description": "Apple
icon",
"Result": "NA"
}
这是 process_task() 函数。
async def main():
num_tasks = 2
delay_between_tasks = 3
for _ in range(num_tasks):
Result = await process_task()
if Result == "OK":
print("Exiting loop due to NA result")
break
print("--delay--")
await asyncio.sleep(delay_between_tasks)
gc.collect()
async def process_task():
print("Screenshot")
screenshot = await capture_screen()
print("Waiting for screenshot to save...")
await asyncio.sleep(1)
print("Waiting for parsing to complete...")
filename, parsed_content_list, label_coordinates = await asyncio.to_thread(parsing)
await asyncio.sleep(1)
print("LLM")
Result = await LLM(filename, parsed_content_list, label_coordinates)
del screenshot, filename, parsed_content_list, label_coordinates
return Result
以下是完整的代码。
import requests
from PIL import Image
import re
import tempfile
import json
import os
import base64
import asyncio
import random
import uuid
from io import BytesIO
import backoff
import io
import pyautogui
import time
from PIL import Image, ImageDraw, ImageFont
import io
import gc
from openai import OpenAIError, AsyncOpenAI
from langchain_openai import ChatOpenAI
baseurl = ""
apikey = ""
os.environ["OPENAI_API_BASE"] = baseurl
os.environ["OPENAI_API_KEY"] = apikey
clienta = AsyncOpenAI(api_key=apikey, base_url=baseurl)
MODEL = "gpt-4o"
async def capture_screen():
def capture():
screenshot = pyautogui.screenshot()
if screenshot.mode == "RGBA":
screenshot = screenshot.convert("RGB")
screenshot.save("optimized_screenshot.jpg", "JPEG", quality=55, optimize=True)
return "optimized_screenshot.jpg"
return await asyncio.to_thread(capture)
@backoff.on_exception(backoff.expo, OpenAIError)
async def parse_image_with_gpt(base64_image: str, screen_elements: str, coordinates: str, question: str) -> dict:
"""
Parse image using GPT.
Args:
base64_image: base64 encoded image string
"""
messages = [
{
"role": "system",
"content": f"""
Here is a UI screenshot image with bounding boxes and corresponding labeled ID overlayed on top of it, and here is a list of icon/text box description:
{screen_elements} \n
"""
},
{
"role": "user",
"content": [
{"type": "text", "text": f"""
Your task is {question}. Which bounding box label you should operate on?
ONLY after you completed the steps required by the task, set the result to "OK." If the task is not completed or partially completed, set the result to "NA."
Provide your answer in JSON format. Example:
{{
"BoundingBoxID": 12,
"Description": "description",
"Result": "result"
}}
"""},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}",
"detail": "high"
},
},
],
}
]
response = await clienta.beta.chat.completions.parse(
model=MODEL,
messages=messages,
temperature=0
)
choice = response.choices[0]
content = choice.message.content or ""
return content
async def image_analysis(image_path: str, screen_elements: str, coordinates: str, question: str) -> list:
"""
Image Understanding
Args:
image_path: path to image file
"""
image = Image.open(image_path)
buffered = BytesIO()
image.save(buffered, format="JPEG")
img_byte = buffered.getvalue()
img_base64 = base64.b64encode(img_byte).decode("utf-8")
text_of_image = await parse_image_with_gpt(img_base64, screen_elements, coordinates, question)
return text_of_image
def move_mouse_in_background(x_norm, y_norm, width_norm, height_norm):
time.sleep(0.1)
screen_width, screen_height = pyautogui.size()
center_x = (x_norm + width_norm / 2) * screen_width
center_y = (y_norm + height_norm / 2) * screen_height
pyautogui.moveTo(center_x, center_y, duration=0.6)
print(f"Mouse moved to: ({center_x}, {center_y})")
pyautogui.click(x=center_x, y=center_y)
def parsing():
url = "http://127.0.0.1:8000/process-image"
file_path = "optimized_screenshot.jpg"
files = {"file": open(file_path, "rb")}
response = requests.post(url, files=files)
print("Status Code:", response.status_code)
print("Response Body:", response.json())
res = response.json()
filename = res["filename"]
parsed_content_list = res["parsed_content"]
label_coordinates = res["label_coordinates"]
return filename, parsed_content_list, label_coordinates
async def LLM(filename, parsed_content_list, label_coordinates):
llm = ChatOpenAI(model="gpt-4o")
question = """
find the icon to lock computer on Mac by following these steps:
1. Find "Apple" icon on menu bar, then
2. Find "Lock screen"
"""
result = await image_analysis(filename, parsed_content_list, label_coordinates, question)
print(result)
cleaned_data = result.replace("```json", "").replace("```", "").strip()
parsed_data = json.loads(cleaned_data)
boundingID = parsed_data["BoundingBoxID"]
print(boundingID)
Result = parsed_data["Result"]
boundingIDStr = str(boundingID)
lc = label_coordinates[boundingIDStr]
move_mouse_in_background(lc[0],lc[1],lc[2],lc[3])
return Result
async def main():
num_tasks = 2
delay_between_tasks = 3
for _ in range
异步任务执行框架
import asyncio
import gc
async def capture_screen():
"""
模拟截屏操作
"""
await asyncio.sleep(1)
return "screenshot_data"
def parsing(screenshot_data):
"""
模拟解析操作
"""
import time
time.sleep(1)
return "filename", ["parsed_content"], [(10, 20, 30, 40)]
async def LLM(filename, parsed_content_list, label_coordinates):
"""
模拟LLM调用
"""
await asyncio.sleep(1)
return "OK"
async def main():
delay_between_tasks = 2
while True:
print("Starting task...")
Result = await process_task()
print("Task completed, result:", Result)
if Result == "OK":
print("Exiting loop due to NA result")
break
print("--delay--")
await asyncio.sleep(delay_between_tasks)
gc.collect()
async def process_task():
print("Screenshot")
screenshot = await capture_screen()
print("Waiting for screenshot to save...")
await asyncio.sleep(1)
print("Waiting for parsing to complete...")
filename, parsed_content_list, label_coordinates = await asyncio.to_thread(parsing, screenshot)
await asyncio.sleep(1)
print("LLM")
Result = await LLM(filename, parsed_content_list, label_coordinates)
del screenshot, filename, parsed_content_list, label_coordinates
return Result
if __name__ == "__main__":
asyncio.run(main())
通过这种实现,您可以利用 agentic 框架的强大功能来无缝执行各种任务。