Type something to search...
构建实时gemini 2.0移动应用:5个步骤实现语音聊天和图像共享功能

构建实时gemini 2.0移动应用:5个步骤实现语音聊天和图像共享功能

作者插图

在本教程中,我将分享我的最新项目,继续探索与 Gemini 2.0 及其 multi-modal live API 相关的实时应用。在通过多个网络应用项目深入了解 Gemini 2.0 的迷人实时能力后,包括摄像头聊天、屏幕共享互动画布和 RAG 助手,我花了一个周末构建了一个 Android 应用,以复制甚至增强与 Gemini 2.0 的多模态实时体验。以下是演示视频:

现在,让我们深入技术细节。和往常一样,我们将首先快速回顾 Gemini 2.0 提供的核心功能,并了解为什么 multi-modal live API 如此强大。Gemini 的主要优势在于其同时处理多种类型输入的能力,包括文本、音频和视频。这种多模态能力结合了 live API 的流媒体功能,使得用户可以在对话中自然、互动地进行交流,用户可以无缝切换说话、展示图像和接收响应,就像与真实的人交谈一样。live API 还能够优雅地处理中断,并在整个对话中保持上下文,这对于构建响应式移动应用程序是理想的。

然而,我们必须记住,这个 API 并不是 Google 官方的 Generative AI 套件的一部分,仍然被视为实验性。目前,它是免费的,API 密钥的速率限制为每个 API 密钥三条并发会话、每分钟 400 万 tokens,以及音频会话持续时间限制为 15 分钟,音频加视频会话限制为 2 分钟。没有可用的付费计划来增加这些限制。

系统架构

不再赘述,让我们构建这个应用程序。首先,我们需要了解这个应用程序的基本结构和数据流。请注意,这是初始版本,因此其功能并不复杂。我可能会在未来的教程中添加更高级的功能。

该过程始于用户在 Android 应用上,用户通过摄像头捕捉图像或通过麦克风录制声音。Android 应用随后处理这些媒体数据。图像被压缩并编码为 Base64,而音频则被捕获和处理。这些处理过的媒体数据,包括图像和音频,被打包成 JSON 格式,并通过 WebSocket 连接传输到我们的 Python 服务器。在服务器端,消息被接收,音频和图像数据被转发到 Gemini 2.0 的 multi-modal live API。模型处理媒体数据并生成响应,响应可以是音频或文本。然后,后端通过 WebSocket 将此响应数据流式传输回 Android 客户端,在那里音频响应被排队并播放。

代码讲解

现在,让我们开始编码。我将向您展示整个服务器代码,使用 Python,因为它很简短。对于 Android 应用,不用担心,我不会逐行讲解。相反,我会专注于使应用正常工作的关键组件。我尽量保持结构尽可能清晰,这样即使您不是 Android 专家,仍然可以很好地理解它的运作。如果您需要整个代码库,可以访问我的 GitHub 仓库找到它。

后端 Python 服务器

后端实现与我之前的网页应用项目非常相似,因为它作为前端与 multi-modal live API 之间的桥梁,无论是移动端还是网页都没有改变。因此,如果您已经阅读过我之前的教程,例如:

您可以跳过这一部分,直接进入后面关于 Android 应用的代码讲解。

初始化

对于用 Python 编写的服务器,请确保您已安装 google-genai 包。我使用的是最新版本 0.5.0,因此请确保您有相同的版本。同时,安装 websockets 包以便于服务器与客户端之间的通信。

我们使用 websockets 库创建一个服务器,监听来自我们的 Android 应用的传入连接。

import asyncio
import json
import os
import websockets
from google import genai
import base64

async def main() -> None:
    async with websockets.serve(gemini_session_handler, "0.0.0.0", 9084):
        print("Running websocket server 0.0.0.0:9084...")
        await asyncio.Future()  # Keep the server running indefinitely

在这里,服务器设置在特定端口,例如 9084。多模态处理逻辑在 gemini_session_handler() 中处理,这是每个新的 WebSocket 连接的核心。确保服务器监听 0.0.0.0 IP 地址而不是 localhost 是很重要的,因为这允许服务器暴露于外部网络。这是必要的,以便移动客户端可以从任何外部 IP 地址连接到它,因为移动客户端可能在不同的网络上运行。google-genai 库用于创建一个能够与 Gemini Multimodal API 通信的客户端,我们在其中指定我们的 API 密钥并配置模型 gemini-2.0-flash-exp,我们将用于处理多模态数据。gemini_session_handler()websockets.serve 调用,管理 Gemini API 会话的整个生命周期。

os.environ['GOOGLE_API_KEY'] = ''
MODEL = "gemini-2.0-flash-exp"

client = genai.Client(
  http_options={
    'api_version': 'v1alpha',
  }
)

gemini_session_handler()

这里是整个 gemini_session_handler() 函数定义:

async def gemini_session_handler(client_websocket: websockets.WebSocketServerProtocol):
    """Handles the interaction with Gemini API within a websocket session."""
    try:
        config_message = await client_websocket.recv()
        config_data = json.loads(config_message)
        config = config_data.get("setup", {})

        config["system_instruction"] = "You are a daily life assistant."

        async with client.aio.live.connect(model=MODEL, config=config) as session:
            print("Connected to Gemini API")

            async def send_to_gemini():
                """Sends messages from the client websocket to the Gemini API."""
                try:
                    async for message in client_websocket:
                        try:
                            data = json.loads(message)
                            if "realtime_input" in data:
                                for chunk in data["realtime_input"]["media_chunks"]:
                                    if chunk["mime_type"] == "audio/pcm":
                                        await session.send(input={"mime_type": "audio/pcm", "data": chunk["data"]})

                                    elif chunk["mime_type"] == "image/jpeg":
                                        print(f"Sending image chunk: {chunk['data'][:50]}")
                                        await session.send(input={"mime_type": "image/jpeg", "data": chunk["data"]})

                        except Exception as e:
                            print(f"Error sending to Gemini: {e}")
                    print("Client connection closed (send)")
                except Exception as e:
                    print(f"Error sending to Gemini: {e}")
                finally:
                    print("send_to_gemini closed")

            async def receive_from_gemini():
                """Receives responses from the Gemini API and forwards them to the client, looping until turn is complete."""
                try:
                    while True:
                        try:
                            print("receiving from gemini")
                            async for response in session.receive():
                                if response.server_content is None:
                                    print(f'Unhandled server message! - {response}')
                                    continue

                                model_turn = response.server_content.model_turn
                                if model_turn:
                                    for part in model_turn.parts:
                                        if hasattr(part, 'text') and part.text is not None:
                                            await client_websocket.send(json.dumps({"text": part.text}))
                                        elif hasattr(part, 'inline_data') and part.inline_data is not None:
                                            print("audio mime_type:", part.inline_data.mime_type)
                                            base64_audio = base64.b64encode(part.inline_data.data).decode('utf-8')

                                            await client_websocket.send(json.dumps({"audio": base64_audio}))

                                            print("audio received")

                                if response.server_content.turn_complete:
                                    print('\\n<Turn complete\\>')

                        except websockets.exceptions.ConnectionClosedOK:
                            print("Client connection closed normally (receive)")
                            break  
                        except Exception as e:
                            print(f"Error receiving from Gemini: {e}")
                            break

                except Exception as e:
                    print(f"Error receiving from Gemini: {e}")
                finally:
                    print("Gemini connection closed (receive)")

            send_task = asyncio.create_task(send_to_gemini())

            receive_task = asyncio.create_task(receive_from_gemini())
            await asyncio.gather(send_task, receive_task)

    except Exception as e:
        print(f"Error in Gemini session: {e}")
    finally:
        print("Gemini session closed.")

在函数内部,当 WebSocket 连接建立时,我们首先从客户端获取配置数据。我们还向 config 结构添加系统指令,以指导模型的行为。在这种情况下,我指示模型成为一个“日常生活助手”。然后,我们使用 client.aio.live.connect() 连接到 Gemini API,提供之前定义的模型和配置。这建立了与 Gemini API 的连接作为一个 session,允许服务器与 Gemini API 发送和接收数据。gemini_session_handler() 内的 send_to_gemini() 函数管理从 Android 客户端到 Gemini API 的消息流。它获取格式为 JSON 的媒体 chunks,包含音频和图像数据,并通过 WebSocket 连接使用 session.send() 方法将其发送到 Gemini API。

接下来是 receive_from_gemini() 函数,该函数负责从 Gemini API 接收数据并将其转发回客户端。它使用一个 while 循环不断监听来自 Gemini 的新消息。收到消息后,它检查响应是文本还是音频。如果是音频响应,它提取 Base64 数据并通过 WebSocket 连接使用 client_websocket.send() 方法将其发送回客户端。它还监视 turn_complete 标志,以确保接收到完整音频轮次的所有块。这两个函数打包成两个任务并使用 asyncio.gather() 方法并发运行。

这完成了后端服务器。

Android 应用

现在,让我们切换到 Android 端。首先,您应该下载、安装并设置 Android Studio 作为 IDE,并打开从我的 GitHub 下载的项目。

简要地说,让我们检查一下布局,它保存在 activity_main.xml 中,该文件位于 app/src/main/res/layout 的主文件夹下。

Image 7

Android 应用的布局

如您所见,这是一种非常简单的设计,由一个线性布局组成,其中包括一个用于图像捕获预览的 ImageView、一个状态指示器、三个用于相机捕获、录音开始和录音停止的 Material 按钮,以及一个可选的 TextView 用于显示聊天记录,但在此项目中尚未完全实现。

现在,让我们查看 MainActivity.kt 文件,这是我们应用的主要入口点,使用 Kotlin 编写。

关键变量

MainActivity.kt 中,我们从布局 XML 开始,定义了一些关键变量,包括对 UI 元素的引用:imageViewcaptureButtonstartButtonstopButtonchatLogstatusIndicator

class MainActivity : AppCompatActivity() {

    private lateinit var imageView: ImageView
    private lateinit var captureButton: Button
    private lateinit var startButton: Button
    private lateinit var stopButton: Button
    private lateinit var chatLog: TextView
    private lateinit var statusIndicator: ImageView

我们还有一些重要的变量用于管理连接和数据,包括:

websocket 作为我们与 Python 后端进行实时通信的核心,使用一个 URL,包括连接到 WebSocket 服务器的真实 IP 地址。确保你正确输入实际运行 Python 后端的 IP 地址。

    private var webSocket: WebSocketClient? = null
    private val URL = "ws://your_server_IP:9084"

isRecording 标志表示音频录制状态,audioRecord 处理麦克风音频,pcmData 存储临时音频,audioQueue 异步管理传入的音频块,audioTrack 处理解码音频响应的播放。currentFrameB64 存储 Base64 编码的图像数据。我们使用特定的压缩算法以确保与 Gemini 多模态 API 的兼容性,同时保持良好的用户体验。我们还有 recordInterval,一个用于发送音频块的协程作业,以及 isPlaying,以确保顺序播放。

    private var currentFrameB64: String? = null
    private var isRecording = false
    private var audioRecord: AudioRecord? = null
    private var pcmData = mutableListOf<Short>()
    private var job: Job? = null
    private var recordInterval: Job? = null
    private val audioQueue = mutableListOf<ByteArray>()
    private var isPlaying = false
    private var audioTrack: AudioTrack? = null;

功能

现在,让我们来看一下关键功能。onCreate() 函数初始化 UI 组件并设置点击监听器。

override fun onCreate(savedInstanceState: Bundle?) {
    super.onCreate(savedInstanceState)
    setContentView(R.layout.activity_main)

    imageView = findViewById(R.id.imageView)
    captureButton = findViewById(R.id.captureButton)
    startButton = findViewById(R.id.startButton)
    stopButton = findViewById(R.id.stopButton)
    chatLog = findViewById(R.id.chatLog)
    statusIndicator = findViewById(R.id.statusIndicator) 
}

checkCameraPermission()checkRecordAudioPermission() 处理实时权限请求。

private fun checkCameraPermission() {
    if (ContextCompat.checkSelfPermission(
            this,
            Manifest.permission.CAMERA
        ) != PackageManager.PERMISSION_GRANTED
    ) {
        // 处理权限未授予
    }
}
private fun checkRecordAudioPermission() {
    if (ContextCompat.checkSelfPermission(
            this,
            Manifest.permission.RECORD_AUDIO
        ) != PackageManager.PERMISSION_GRANTED
    ) {
        // 处理权限未授予
    }
}

openCamera()createImageFile() 捕获图像并将其存储在临时文件中。

private fun openCamera() {
    val takePictureIntent = Intent(MediaStore.ACTION_IMAGE_CAPTURE)
    val photoFile: File? = try {
        createImageFile()
    } catch (ex: IOException) {
        // 创建文件时发生错误
        null
    }
    // 继续进行相机意图
}
private fun createImageFile(): File {
    val timeStamp: String = SimpleDateFormat("yyyyMMdd_HHmmss", Locale.getDefault()).format(Date())
    val storageDir: File = getExternalFilesDir(Environment.DIRECTORY_PICTURES)!!
    return File.createTempFile(
        "JPEG_${timeStamp}_",
        ".jpg",
        storageDir
    ).apply {
        currentPhotoPath = absolutePath
    }
}

onActivityResult() 处理捕获的图像,调整大小、压缩并编码为 Base64 字符串,将生成的位图设置为预览。

override fun onActivityResult(requestCode: Int, resultCode: Int, data: Intent?) {
    super.onActivityResult(requestCode, resultCode, data)
    if (requestCode == CAMERA_REQUEST_CODE && resultCode == RESULT_OK) {
        val file = File(currentPhotoPath)

        val options = BitmapFactory.Options().apply {
            inJustDecodeBounds = true
        }
        BitmapFactory.decodeFile(file.absolutePath, options)

        val (originalWidth, originalHeight) = options.outWidth to options.outHeight
        val scaleFactor = calculateScaleFactor(originalWidth, originalHeight, MAX_IMAGE_DIMENSION)
        // 继续处理图像
    }
}

connect() 函数建立与 WebSocket 的连接,并定义事件。

private fun connect() {
    Log.d("WebSocket", "Connecting to: $URL")
    webSocket = object : WebSocketClient(URI(URL)) {
        override fun onOpen(handshakedata: ServerHandshake?) {
            Log.d("WebSocket", "Connected")
            isConnected = true
            updateStatusIndicator() 
            sendInitialSetupMessage()
        }

        override fun onMessage(message: String?) {
            Log.d("WebSocket", "Message Received: $message")
            receiveMessage(message)
        }

        override fun onClose(code: Int, reason: String?, remote: Boolean) {
            Log.d("WebSocket", "Connection Closed: $reason")
            isConnected = false
            updateStatusIndicator() 
            runOnUiThread {
                Toast.makeText(this@MainActivity, "Connection closed", Toast.LENGTH_SHORT).show()
            }
        }

        override fun onError(ex: Exception?) {
            Log.e("WebSocket", "Error: ${ex?.message}")
            isConnected = false
            updateStatusIndicator() 
        }
    }
    webSocket?.connect()
}

sendInitialSetupMessage() 将配置发送到后端。sendVoiceMessage() 将音频和图像数据打包为 JSON 通过 WebSocket 发送给 Gemini。

private fun sendInitialSetupMessage() {
    Log.d("WebSocket", "Sending initial setup message")
    val setupMessage = JSONObject()
    val setup = JSONObject()
    val generationConfig = JSONObject()
    val responseModalities = org.json.JSONArray()
    responseModalities.put("AUDIO")
    generationConfig.put("response_modalities", responseModalities)
    setup.put("generation_config", generationConfig)
    setupMessage.put("setup", setup)
    webSocket?.send(setupMessage.toString())
}
private fun sendVoiceMessage(b64PCM: String?) {
    if(webSocket?.isOpen == false){
        Log.d("WebSocket", "websocket not open")
        return
    }
    if (b64PCM == null) return

    val payload = JSONObject()
    val realtimeInput = JSONObject()
    val mediaChunks = org.json.JSONArray()
    val audioChunk = JSONObject()
    // 继续打包音频数据
}

receiveMessage() 处理响应,处理文本以显示在 chatLog 中,并将音频响应传递给 injestAudioChuckToPlay(),该函数将数据存储在队列中并调用 playNextAudioTrunk()playNextAudioTrunk() 函数顺序检索音频块并调用 playAudio。playAudio() 使用 Android AudioTrack() 库播放音频数据。

private fun receiveMessage(message: String?) {
    if (message == null) return

    val messageData = JSONObject(message)
    val response = Response(messageData)
    if (response.text != null) {
        displayMessage("GEMINI: " + response.text)
    }

    if (response.audioData != null) {
        injestAudioChuckToPlay(response.audioData)
    }
}
private fun injestAudioChuckToPlay(base64AudioChunk: String?) {
    if (base64AudioChunk == null) return

    GlobalScope.launch(Dispatchers.IO) {
        try {
            val arrayBuffer = base64ToArrayBuffer(base64AudioChunk)
            synchronized(audioQueue) {
                audioQueue.add(arrayBuffer)
            }
            if (!isPlaying) {
                playNextAudioChunk()
            }
            Log.d("Audio", "Audio chunk added to the queue")
        } catch (e: Exception) {
            Log.e("Audio", "Error processing chunk", e)
        }
    }
}
private fun playNextAudioChunk() {
    GlobalScope.launch(Dispatchers.IO) {
        while (true) {
            val chunk = synchronized(audioQueue) {
                if (audioQueue.isNotEmpty()) audioQueue.removeAt(0) else null
            } ?: break

            isPlaying = true
            playAudio(chunk)
        }
        isPlaying = false

        synchronized(audioQueue) {
            if (audioQueue.isNotEmpty()) {
                playNextAudioChunk()
            }
        }
    }
}

startAudioInput() 启动音频录制,创建 AudioRecord(),并触发协程任务以频繁录制音频。recordTrunk() 函数将 PCM 音频转换为 Base64,发送到后端,并清理临时数据。stopAudioInput() 停止音频录制,关闭 AudioRecord() 会话,并向服务器发送关闭消息。您可以在源代码库中找到这些函数的详细信息。

最后,updateStatusIndicator() 使用不同的颜色在状态图标上显示连接状态。

private fun updateStatusIndicator() {
    runOnUiThread {
        if (!isConnected) {
            statusIndicator.setImageResource(R.drawable.baseline_error_24)
            statusIndicator.setColorFilter(android.graphics.Color.RED)
        } else if (!isSpeaking) {
            statusIndicator.setImageResource(R.drawable.baseline_equalizer_24)
            statusIndicator.setColorFilter(android.graphics.Color.GRAY)
        } else {
            statusIndicator.setImageResource(R.drawable.baseline_equalizer_24)
            statusIndicator.setColorFilter(android.graphics.Color.GREEN)
        }
    }
}

亮点

这个 Android 应用的一个创新之处在于我们如何使用协程任务管理实时音频流,持续捕获音频,频繁发送每个音频块,并通过队列仔细管理音频播放。

此外,图像压缩是另一个关键改进。通过调整捕获图像的大小和压缩数据,我们在不牺牲用户体验的情况下大幅减少了图像数据的大小,这保持了网络开销低并确保了流畅的实时通信。

运行应用程序

应用程序现在准备运行。我们应该启动后端服务器和前端 Android 应用。

要启动后端服务器,只需执行 Python 脚本。您会看到 WebSocket 服务器正在“0.0.0.0:9084”上运行。这表明我们的后端服务器已成功在外部网络上运行。

Image 8

接下来,在连接到 Android Studio 的测试设备上运行 Android 应用。您会在设备管理器中看到您的物理设备,然后点击运行按钮。

Image 9

您现在应该在测试手机上看到干净的应用。如果出现红色警告标志,则表示连接问题。灰色指示器表示应用程序准备好进行交互。您可以点击“麦克风”按钮将指示器变为绿色并开始与 Gemini 聊天,或者点击“摄像头”按钮拍照与其聊天。

Image 10

连接错误

Related Posts

结合chatgpt-o3-mini与perplexity Deep Research的3步提示:提升论文写作质量的终极指南

结合chatgpt-o3-mini与perplexity Deep Research的3步提示:提升论文写作质量的终极指南

AI 研究报告和论文写作 合并两个系统指令以获得两个模型的最佳效果 Perplexity AI 的 Deep Research 工具提供专家级的研究报告,而 OpenAI 的 ChatGPT-o3-mini-high 擅长推理。我发现你可以将它们结合起来生成令人难以置信的论文,这些论文比任何一个模型单独撰写的都要好。你只需要将这个一次性提示复制到 **

阅读更多
让 Excel 过时的 10 种 Ai 工具:实现数据分析自动化,节省手工作业时间

让 Excel 过时的 10 种 Ai 工具:实现数据分析自动化,节省手工作业时间

Non members click here作为一名软件开发人员,多年来的一个发现总是让我感到惊讶,那就是人们还在 Excel

阅读更多
使用 ChatGPT 搜索网络功能的 10 种创意方法

使用 ChatGPT 搜索网络功能的 10 种创意方法

例如,提示和输出 你知道可以使用 ChatGPT 的“搜索网络”功能来完成许多任务,而不仅仅是基本的网络搜索吗? 对于那些不知道的人,ChatGPT 新的“搜索网络”功能提供实时信息。 截至撰写此帖时,该功能仅对使用 ChatGPT 4o 和 4o-mini 的付费会员开放。 ![](https://images.weserv.nl/?url=https://cdn-im

阅读更多
掌握Ai代理:解密Google革命性白皮书的10个关键问题解答

掌握Ai代理:解密Google革命性白皮书的10个关键问题解答

10 个常见问题解答 本文是我推出的一个名为“10 个常见问题解答”的新系列的一部分。在本系列中,我旨在通过回答关于该主题的十个最常见问题来分解复杂的概念。我的目标是使用简单的语言和相关的类比,使这些想法易于理解。 图片来自 [Solen Feyissa](https://unsplash.com/@solenfeyissa?utm_source=medium&utm_medi

阅读更多
在人工智能和技术领域保持领先地位的 10 项必学技能 📚

在人工智能和技术领域保持领先地位的 10 项必学技能 📚

在人工智能和科技这样一个动态的行业中,保持领先意味着不断提升你的技能。无论你是希望深入了解人工智能模型性能、掌握数据分析,还是希望通过人工智能转变传统领域如法律,这些课程都是你成功的捷径。以下是一个精心策划的高价值课程列表,可以助力你的职业发展,并让你始终处于创新的前沿。 1. 生成性人工智能简介课程: [生成性人工智能简介](https://genai.works

阅读更多
揭开真相!深度探悉DeepSeek AI的十大误区,您被误导了吗?

揭开真相!深度探悉DeepSeek AI的十大误区,您被误导了吗?

在AI军备竞赛中分辨事实与虚构 DeepSeek AI真的是它所宣传的游戏规则改变者,还是仅仅聪明的营销和战略炒作?👀 虽然一些人将其视为AI效率的革命性飞跃,但另一些人则认为它的成功建立在借用(甚至窃取的)创新和可疑的做法之上。传言称,DeepSeek的首席执行官在疫情期间像囤积卫生纸一样囤积Nvidia芯片——这只是冰山一角。 从其声称的550万美元培训预算到使用Open

阅读更多
Type something to search...