Type something to search...
掌握屏幕解析:使用 GPT-4o Vision 和 Omniparser 锁定 macOS 的逐步指南

掌握屏幕解析:使用 GPT-4o Vision 和 Omniparser 锁定 macOS 的逐步指南

使用 GPT-4o 和 OmniParser 构建屏幕解析代理

在本教程中,我们将探讨使用 GPT-4o 和 OmniParser 构建屏幕解析代理。我们的目标是演示使用此代理锁定计算机,特别是 MacOS 设备。

以下是该过程的逐步分解:

  1. 捕获屏幕截图:使用 PyAutoGUI 截取当前屏幕状态的屏幕截图。
  2. 解析屏幕截图:将屏幕截图发送到 OmniParser 进行分析。
  3. 任务请求:使用 LLM GPT-4o Vision 执行特定任务,例如锁定计算机。
  4. 评估任务完成情况:从 LLM GPT-4o 接收结果并确定任务是否完成。
  5. 条件操作
    • 如果任务未完成,使用 PyAutoGUI 将光标移动到特定的屏幕坐标并模拟点击。
    • 如果任务已完成,则终止流程。
  6. 迭代:如果任务未完成,则从步骤 1 重复该过程。

流程图

解析屏幕截图:OmniParser 集成实现

为了缓解多次迭代期间遇到的高内存使用问题,我部署了 FastAPI 来托管解析代码。如果您有其他解决方案或建议,请随时分享!

以下是负责在步骤 2 — 解析屏幕截图中使用 OmniParser 进行解析的 Python 代码。

from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.responses import JSONResponse
from PIL import Image
import base64
import uuid
import os
import asyncio
import torch
import gc
from transformers import AutoProcessor, AutoModelForCausalLM
from transformers.dynamic_module_utils import get_imports
from ultralytics import YOLO
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain import hub
from utils import (
    check_ocr_box,
    get_yolo_model,
    get_caption_model_processor,
    get_som_labeled_img,
)
from unittest.mock import patch

app = FastAPI()

def fixed_get_imports(filename: str | os.PathLike) -> list[str]:
    print(filename)
    if not str(filename).endswith("modeling_florence2.py"):
        return get_imports(filename)
    imports = get_imports(filename)
    if "flash_attn" in imports:  
        imports.remove("flash_attn")
    return imports

model_path = "microsoft/Florence-2-base"

with patch("transformers.dynamic_module_utils.get_imports", fixed_get_imports): 
    model = AutoModelForCausalLM.from_pretrained("weights/icon_caption_florence", torch_dtype=torch.float16, trust_remote_code=True).to("mps")

processor = AutoProcessor.from_pretrained("microsoft/Florence-2-base", trust_remote_code=True)
caption_model_processor = {'processor': processor, 'model': model}

yolo_model = YOLO('weights/icon_detect/best.pt').to('mps')
BOX_TRESHOLD = 0.05

@app.post("/process-image")
async def process_image(file: UploadFile = File(...)):
    """
    Process an uploaded image and return OCR, labeled image, and parsed content.
    """
    try:
        contents = await file.read()
        temp_filename = f"{uuid.uuid4()}.jpg"  
        temp_path = temp_filename

        with open(temp_path, "wb") as temp_file:
            temp_file.write(contents)

        image = Image.open(temp_path)
        image_rgb = image.convert('RGB')
        box_overlay_ratio = image.size[0] / 3200
        draw_bbox_config = {
            'text_scale': 0.8 * box_overlay_ratio,
            'text_thickness': max(int(2 * box_overlay_ratio), 1),
            'text_padding': max(int(3 * box_overlay_ratio), 1),
            'thickness': max(int(3 * box_overlay_ratio), 1),
        }

        ocr_bbox_rslt, is_goal_filtered = check_ocr_box(temp_path, display_img = False, output_bb_format='xyxy', goal_filtering=None, easyocr_args={'paragraph': False, 'text_threshold':0.9}, use_paddleocr=True)
        text, ocr_bbox = ocr_bbox_rslt

        dino_labeled_img, label_coordinates, parsed_content_list = get_som_labeled_img(temp_path, yolo_model, BOX_TRESHOLD = BOX_TRESHOLD, output_coord_in_ratio=True, ocr_bbox=ocr_bbox,draw_bbox_config=draw_bbox_config, caption_model_processor=caption_model_processor, ocr_text=text,iou_threshold=0.1)

        filename = f"{uuid.uuid4()}.jpg"
        with open(filename, 'wb') as f:
            f.write(base64.b64decode(dino_labeled_img))

        print(f"Image saved as {filename}")
        gc.collect()

        return {
            "filename": filename,
            "label_coordinates": label_coordinates,
            "parsed_content": parsed_content_list
        }

    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Error processing image: {str(e)}")

要运行代码,请使用以下命令:

uvicorn fastop:app

通过 Curl 调用 API

您可以使用 Curl 命令调用此 API,将 UI 屏幕截图作为图像文件传递。

curl -X POST "http://127.0.0.1:8000/process-image" \
-H "Content-Type: multipart/form-data" \
-F "file=@imgs/optimized_screenshot.jpg"

API 调用将返回一个 JSON 响应,其中包含以下键值对:

  • filename:要发送到 LLM 的图像的随机生成的文件名。
'filename': '438ce778-a516-4d28-87b5-80a8bd618f66.jpg'
  • label_coordinates:检测到的标签的屏幕坐标。
'label_coordinates': {
    '0': [
      0.02806712521447076,
      0.006714412754096405,
      0.04079861111111111,
      0.022829006266786033
    ],
    '1': [
      0.07291666666666667,
      0.008057296329453895,
      0.022858796296296297,
      0.018352730528200537
    ],
    '2': [
      0.10011574074074074,
      0.008057296329453895,
      0.019675925925925927,
      0.018352730528200537
    ],
    ...
}
  • parsed_content:从图像中提取的已解析内容/文本的列表。
'parsed_content': [
    'Text Box ID 0: Terminal',
    'Text Box ID 1: Shell',
    'Text Box ID 2: Edit',

截取屏幕截图、任务请求、评估任务完成情况、条件操作和迭代

现在,我们将创建 Python 代码来执行剩余的流程:

  • 截取屏幕截图
  • 任务请求
  • 评估任务完成情况
  • 条件操作和
  • 迭代

process_task() 函数是编排以下步骤的主要函数:

  • 截取屏幕截图。
  • 将屏幕截图发送到 OmniParser 进行分析。
  • 使用此任务询问 LLM:

按照以下步骤在 Mac 上找到锁定计算机的图标:

  1. 在菜单栏上找到“Apple”图标,然后 2. 找到“锁定屏幕”
  • 这是完整的提示:

您的任务是 {question}。您应该对哪个边界框标签进行操作?

仅在您完成任务所需的步骤后,将结果设置为“OK”。如果任务未完成或部分完成,请将结果设置为“NA”。

以 JSON 格式提供您的答案。示例:{{ “BoundingBoxID”: 12, “Description”: “description”, “Result”: “result” }}

  • 如果任务已完成(例如,结果等于 OK),则结束循环,否则进入下一次迭代。

任务完成时的示例响应:

{
    "BoundingBoxID": 21,
    "Description": "Lock Screen",
    "Result": "OK"
}

任务尚未完成时的示例响应:

{
    "BoundingBoxID": 55,
    "Description": "Apple 
icon",
    "Result": "NA"
}

这是 process_task() 函数。

async def main():

    num_tasks = 2
    delay_between_tasks = 3

    for _ in range(num_tasks):
        Result = await process_task()
        if Result == "OK":
            print("Exiting loop due to NA result")
            break

        print("--delay--")
        await asyncio.sleep(delay_between_tasks)

    gc.collect()

async def process_task():
    print("Screenshot")
    screenshot = await capture_screen()

    print("Waiting for screenshot to save...")
    await asyncio.sleep(1)

    print("Waiting for parsing to complete...")
    filename, parsed_content_list, label_coordinates = await asyncio.to_thread(parsing)

    await asyncio.sleep(1)

    print("LLM")
    Result = await LLM(filename, parsed_content_list, label_coordinates)

    del screenshot, filename, parsed_content_list, label_coordinates

    return Result

以下是完整的代码。

import requests
from PIL import Image
import re
import tempfile
import json
import os
import base64
import asyncio
import random
import uuid
from io import BytesIO
import backoff
import io
import pyautogui
import time
from PIL import Image, ImageDraw, ImageFont
import io
import gc
from openai import OpenAIError, AsyncOpenAI
from langchain_openai import ChatOpenAI

baseurl = ""
apikey = ""
os.environ["OPENAI_API_BASE"] = baseurl
os.environ["OPENAI_API_KEY"] = apikey
clienta = AsyncOpenAI(api_key=apikey,  base_url=baseurl)
MODEL = "gpt-4o"

async def capture_screen():
    def capture():
        screenshot = pyautogui.screenshot()
        if screenshot.mode == "RGBA":
            screenshot = screenshot.convert("RGB")
        screenshot.save("optimized_screenshot.jpg", "JPEG", quality=55, optimize=True)
        return "optimized_screenshot.jpg"
    return await asyncio.to_thread(capture)

@backoff.on_exception(backoff.expo, OpenAIError)
async def parse_image_with_gpt(base64_image: str, screen_elements: str, coordinates: str, question: str) -> dict:
    """
    Parse image using GPT.

    Args:
        base64_image: base64 encoded image string
    """
    messages = [
        {
            "role": "system",
            "content": f"""

Here is a UI screenshot image with bounding boxes and corresponding labeled ID overlayed on top of it, and here is a list of icon/text box description: 
            {screen_elements} \n

"""
        },
        {
            "role": "user",
            "content": [
                {"type": "text", "text": f"""

Your task is {question}. Which bounding box label you should operate on?

ONLY after you completed the steps required by the task, set the result to "OK." If the task is not completed or partially completed, set the result to "NA."

Provide your answer in JSON format. Example:

{{
                    "BoundingBoxID": 12,
                    "Description": "description",
                    "Result": "result"
                }}

"""},
                {
                    "type": "image_url",
                    "image_url": {
                        "url": f"data:image/jpeg;base64,{base64_image}",
                        "detail": "high"
                    },
                },
            ],
        }
    ]

    response = await clienta.beta.chat.completions.parse(
        model=MODEL,
        messages=messages,
        temperature=0
    )

    choice = response.choices[0]
    content = choice.message.content or "" 
    return content

async def image_analysis(image_path: str, screen_elements: str, coordinates: str, question: str) -> list:
    """
    Image Understanding

    Args:
        image_path: path to image file
    """

    image = Image.open(image_path)

    buffered = BytesIO()
    image.save(buffered, format="JPEG")
    img_byte = buffered.getvalue()
    img_base64 = base64.b64encode(img_byte).decode("utf-8")

    text_of_image = await parse_image_with_gpt(img_base64, screen_elements, coordinates, question)

    return text_of_image

def move_mouse_in_background(x_norm, y_norm, width_norm, height_norm):

    time.sleep(0.1)

    screen_width, screen_height = pyautogui.size()

    center_x = (x_norm + width_norm / 2) * screen_width
    center_y = (y_norm + height_norm / 2) * screen_height

    pyautogui.moveTo(center_x, center_y, duration=0.6)  
    print(f"Mouse moved to: ({center_x}, {center_y})")

    pyautogui.click(x=center_x, y=center_y)

def parsing():
    url = "http://127.0.0.1:8000/process-image"

    file_path = "optimized_screenshot.jpg"
    files = {"file": open(file_path, "rb")}

    response = requests.post(url, files=files)

    print("Status Code:", response.status_code)
    print("Response Body:", response.json())

    res = response.json()

    filename = res["filename"]
    parsed_content_list = res["parsed_content"]
    label_coordinates = res["label_coordinates"]
    return filename, parsed_content_list, label_coordinates

async def LLM(filename, parsed_content_list, label_coordinates):
    llm = ChatOpenAI(model="gpt-4o")

    question = """ 
    find the icon to lock computer on Mac by following these steps:

1. Find "Apple" icon on menu bar, then
    2. Find "Lock screen"

"""

    result = await image_analysis(filename, parsed_content_list, label_coordinates, question)
    print(result)

    cleaned_data = result.replace("```json", "").replace("```", "").strip()
    parsed_data = json.loads(cleaned_data)

    boundingID = parsed_data["BoundingBoxID"]
    print(boundingID)

    Result = parsed_data["Result"]

    boundingIDStr = str(boundingID)
    lc = label_coordinates[boundingIDStr]

    move_mouse_in_background(lc[0],lc[1],lc[2],lc[3])
    return Result

async def main():

    num_tasks = 2
    delay_between_tasks = 3

    for _ in range

异步任务执行框架

import asyncio
import gc

async def capture_screen():
    """
    模拟截屏操作
    """
    await asyncio.sleep(1)
    return "screenshot_data"

def parsing(screenshot_data):
    """
    模拟解析操作
    """
    import time
    time.sleep(1)
    return "filename", ["parsed_content"], [(10, 20, 30, 40)]

async def LLM(filename, parsed_content_list, label_coordinates):
    """
    模拟LLM调用
    """
    await asyncio.sleep(1)
    return "OK"

async def main():
    delay_between_tasks = 2
    while True:
        print("Starting task...")
        Result = await process_task()
        print("Task completed, result:", Result)

        if Result == "OK":
            print("Exiting loop due to NA result")
            break

        print("--delay--")
        await asyncio.sleep(delay_between_tasks)

        gc.collect()

async def process_task():
    print("Screenshot")
    screenshot = await capture_screen()

    print("Waiting for screenshot to save...")
    await asyncio.sleep(1)

    print("Waiting for parsing to complete...")
    filename, parsed_content_list, label_coordinates = await asyncio.to_thread(parsing, screenshot)

    await asyncio.sleep(1)

    print("LLM")
    Result = await LLM(filename, parsed_content_list, label_coordinates)

    del screenshot, filename, parsed_content_list, label_coordinates

    return Result

if __name__ == "__main__":
    asyncio.run(main())

通过这种实现,您可以利用 agentic 框架的强大功能来无缝执行各种任务。

Related Posts

结合chatgpt-o3-mini与perplexity Deep Research的3步提示:提升论文写作质量的终极指南

结合chatgpt-o3-mini与perplexity Deep Research的3步提示:提升论文写作质量的终极指南

AI 研究报告和论文写作 合并两个系统指令以获得两个模型的最佳效果 Perplexity AI 的 Deep Research 工具提供专家级的研究报告,而 OpenAI 的 ChatGPT-o3-mini-high 擅长推理。我发现你可以将它们结合起来生成令人难以置信的论文,这些论文比任何一个模型单独撰写的都要好。你只需要将这个一次性提示复制到 **

阅读更多
让 Excel 过时的 10 种 Ai 工具:实现数据分析自动化,节省手工作业时间

让 Excel 过时的 10 种 Ai 工具:实现数据分析自动化,节省手工作业时间

Non members click here作为一名软件开发人员,多年来的一个发现总是让我感到惊讶,那就是人们还在 Excel

阅读更多
使用 ChatGPT 搜索网络功能的 10 种创意方法

使用 ChatGPT 搜索网络功能的 10 种创意方法

例如,提示和输出 你知道可以使用 ChatGPT 的“搜索网络”功能来完成许多任务,而不仅仅是基本的网络搜索吗? 对于那些不知道的人,ChatGPT 新的“搜索网络”功能提供实时信息。 截至撰写此帖时,该功能仅对使用 ChatGPT 4o 和 4o-mini 的付费会员开放。 ![](https://images.weserv.nl/?url=https://cdn-im

阅读更多
掌握Ai代理:解密Google革命性白皮书的10个关键问题解答

掌握Ai代理:解密Google革命性白皮书的10个关键问题解答

10 个常见问题解答 本文是我推出的一个名为“10 个常见问题解答”的新系列的一部分。在本系列中,我旨在通过回答关于该主题的十个最常见问题来分解复杂的概念。我的目标是使用简单的语言和相关的类比,使这些想法易于理解。 图片来自 [Solen Feyissa](https://unsplash.com/@solenfeyissa?utm_source=medium&utm_medi

阅读更多
在人工智能和技术领域保持领先地位的 10 项必学技能 📚

在人工智能和技术领域保持领先地位的 10 项必学技能 📚

在人工智能和科技这样一个动态的行业中,保持领先意味着不断提升你的技能。无论你是希望深入了解人工智能模型性能、掌握数据分析,还是希望通过人工智能转变传统领域如法律,这些课程都是你成功的捷径。以下是一个精心策划的高价值课程列表,可以助力你的职业发展,并让你始终处于创新的前沿。 1. 生成性人工智能简介课程: [生成性人工智能简介](https://genai.works

阅读更多
揭开真相!深度探悉DeepSeek AI的十大误区,您被误导了吗?

揭开真相!深度探悉DeepSeek AI的十大误区,您被误导了吗?

在AI军备竞赛中分辨事实与虚构 DeepSeek AI真的是它所宣传的游戏规则改变者,还是仅仅聪明的营销和战略炒作?👀 虽然一些人将其视为AI效率的革命性飞跃,但另一些人则认为它的成功建立在借用(甚至窃取的)创新和可疑的做法之上。传言称,DeepSeek的首席执行官在疫情期间像囤积卫生纸一样囤积Nvidia芯片——这只是冰山一角。 从其声称的550万美元培训预算到使用Open

阅读更多
Type something to search...