How to Build a Real-Time Gemini 2.0 Learning Assistant with Interactive Canvas
- Rifx.Online
- Programming , Chatbots , Voice Assistants
- 19 Jan, 2025
Gemini Development Tutorial V5
In this tutorial, we will continue building fascinating real-time chat applications using Gemini 2.0 and its multimodal Live API. Let’s build a learning assistant with real-time voice and text interaction based on a drawing canvas this time.
You might have seen the old video from OpenAI https://youtu.be/_nSmkyDNulk to demonstrate how got-4o assists the student in learning math on a screen of Khan Academy by asking leading questions and providing hints to guide the student towards the correct answer.
By using our previous screen-sharing demo project, we can easily duplicate the same experience by sharing the screen of a learning website or document, operating some edit tools and letting Gemini 2.0 assist the user in learning the content.
However, for real-world applications, relying solely on screen sharing has several limitations. It is, at best, a hack to use, and in the real world, there are many reasons to avoid it, as it couples the application to the specific environment. For example, even if the web app is written in pure HTML and Javascript, it is almost impossible to reuse it to move to the mobile app. It is full of multi-thread with screen sharing, editing together, and voice talking, which needs operating across the app. So, when you want to develop a robust and commercial product, it can be rather difficult to integrate and control the user experience.
This is where the in-app document editor, specifically built around a canvas element, becomes essential. By building our own interface for image loading and editing, we gain more control and have higher scalability and flexibility for future features and uses.
System architecture
Now, let’s see the design of our project.
We will continue to use the same project structure as the previous screen-sharing demo, but we’ll introduce some key changes in how the client interacts with the server and the Gemini Multimodal Live API. The significant shift there is that we’ve replaced the screen-sharing component with an interactive canvas element where the user can load images and make some basic edits using a drawing tool. The client also includes the audio capture, playback, text transcription and the connection to the WebSocket for communication with the server.
On the server side, the code remains the same as our last demo; that is, upon receiving a message, the server forwards both of those media components’ image and audio to the Live API, and the image is captured from the canvas element. It’s also responsible for receiving the streaming response from the Gemini API and forwarding it back to the client. The voice streaming will also be fed to Gemini 1.5 for transcript generation.
Now, we are going to walk through the code. Since the number of lines is a little bit big, I’ll only explain the most important parts of the code. If you want to see the full code, you can find and download it in my GitHub repo, which contains not only the project in this tutorial but also the previous ones in the Gemini 2.0 series, including screen sharing, transcription output, and camera interaction.
Code walkthrough
Now, let’s start with the server side of the code walkthrough.
Server Design
Install the dependencies, including both the two Gemini APIs, the production google-generativeai
for speech-to-text generation for transcription and the experimental multimodal live API google-genai
for Gemini 2.0 real-time audio and image interaction, as well as the websockets
and pydub
that is for audio processing.
pip install --upgrade google-genai==0.3.0 google-generativeai==0.8.3 websockets pydub
Together with it, you should make sure the FFmpeg software is installed on your machine. For the Ubuntu system, you can use the apt-get install ffmpeg
to have it installed.
Moving to the code:
import asyncio
import json
import os
import websockets
from google import genai
import base64
import io
from pydub import AudioSegment
import google.generativeai as generative
import wave
## Load API key from environment
os.environ['GOOGLE_API_KEY'] = ''
generative.configure(api_key=os.environ['GOOGLE_API_KEY'])
MODEL = "gemini-2.0-flash-exp" # use your model ID
TRANSCRIPTION_MODEL = "gemini-1.5-flash-8b"
client = genai.Client(
http_options={
'api_version': 'v1alpha',
}
)
The first section configures the key from Google AI Studio access to the API through an environment variable and specifies the model that we’ll be using. Here, we use the gemini-2.0-flash-exp
model for the real-time image and audio interaction and the gemini-1.5-flash-8b
model for the speech-to-text transcription because it’s super fast and accurate enough for this easy task.
The core of the server-side logic is the gemini_session_handler
function.
This function is called every time a new WebSocket connection from a client is established. It manages the connection with Gemini’s live API and calls send_to_gemini()
and receive_from_gemini()
functions as a background task, which are the main parts of the data processing logic.
async def gemini_session_handler(client_websocket: websockets.WebSocketServerProtocol):
"""Handles the interaction with Gemini API within a websocket session."""
try:
config_message = await client_websocket.recv()
config_data = json.loads(config_message)
config = config_data.get("setup", {})
config["system_instruction"] = """You are a learning assistant.
Check the questions and answers from the students.
Ask leading questions and provide hints to guide the student towards the correct answer."""
async with client.aio.live.connect(model=MODEL, config=config) as session:
print("Connected to Gemini API")
async def send_to_gemini():
"""Sends messages from the client websocket to the Gemini API."""
try:
async for message in client_websocket:
try:
data = json.loads(message)
if "realtime_input" in data:
for chunk in data["realtime_input"]["media_chunks"]:
if chunk["mime_type"] == "audio/pcm":
await session.send({"mime_type": "audio/pcm", "data": chunk["data"]})
elif chunk["mime_type"] == "image/jpeg":
await session.send({"mime_type": "image/jpeg", "data": chunk["data"]})
except Exception as e:
print(f"Error sending to Gemini: {e}")
print("Client connection closed (send)")
except Exception as e:
print(f"Error sending to Gemini: {e}")
finally:
print("send_to_gemini closed")
async def receive_from_gemini():
"""Receives responses from the Gemini API and forwards them to the client, looping until turn is complete."""
try:
while True:
try:
print("receiving from gemini")
async for response in session.receive():
if response.server_content is None:
print(f'Unhandled server message! - {response}')
continue
model_turn = response.server_content.model_turn
if model_turn:
for part in model_turn.parts:
if hasattr(part, 'text') and part.text is not None:
await client_websocket.send(json.dumps({"text": part.text}))
elif hasattr(part, 'inline_data') and part.inline_data is not None:
print("audio mime_type:", part.inline_data.mime_type)
base64_audio = base64.b64encode(part.inline_data.data).decode('utf-8')
await client_websocket.send(json.dumps({"audio": base64_audio}))
# Accumulate the audio data here
if not hasattr(session, 'audio_data'):
session.audio_data = b''
session.audio_data += part.inline_data.data
print("audio received")
if response.server_content.turn_complete:
print('\n<Turn complete>')
# Transcribe the accumulated audio here
transcribed_text = transcribe_audio(session.audio_data)
if transcribed_text:
await client_websocket.send(json.dumps({
"text": transcribed_text
}))
# Clear the accumulated audio data
session.audio_data = b''
except websockets.exceptions.ConnectionClosedOK:
print("Client connection closed normally (receive)")
break # Exit the loop if the connection is closed
except Exception as e:
print(f"Error receiving from Gemini: {e}")
break
except Exception as e:
print(f"Error receiving from Gemini: {e}")
finally:
print("Gemini connection closed (receive)")
# Start send loop
send_task = asyncio.create_task(send_to_gemini())
# Launch receive loop as a background task
receive_task = asyncio.create_task(receive_from_gemini())
await asyncio.gather(send_task, receive_task)
except Exception as e:
print(f"Error in Gemini session: {e}")
finally:
print("Gemini session closed.")
First, it receives the configuration data from the client, which includes the system instruction and the response modality from the client.
Then, it establishes a connection with the Gemini API and starts a session
with the model.
The send_to_gemini()
function manages the flow of messages from the client to the Gemini API. It takes the media chunks sent by the client, packages the audio and image data into the Gemini API message format, and sends it. This is all done asynchronously, ensuring the streaming is seamless.
The receive_from_gemini()
function is responsible for listening to the Gemini API’s responses and forwarding the data to the client. We use a while loop that keeps running until the connection is terminated or an error is triggered. Once the model response is generated, we handle audio data and send it back to the client for play. Meanwhile, the audio data will be accumulated and once the turn_complete
flag is set, the audio data will be sent to the Gemini 1.5 flash model for transcription.
def transcribe_audio(audio_data):
"""Transcribes audio using Gemini 1.5 Flash."""
try:
# Make sure we have valid audio data
if not audio_data:
return "No audio data received."
# Convert PCM to MP3
mp3_audio_base64 = convert_pcm_to_mp3(audio_data)
if not mp3_audio_base64:
return "Audio conversion failed."
transcription_client = generative.GenerativeModel(model_name=TRANSCRIPTION_MODEL)
prompt = """Generate a transcript of the speech.
Please do not include any other text in the response.
If you cannot hear the speech, please only say '<Not recognizable>'."""
response = transcription_client.generate_content(
[
prompt,
{
"mime_type": "audio/mp3",
"data": base64.b64decode(mp3_audio_base64),
}
]
)
return response.text
except Exception as e:
print(f"Transcription error: {e}")
return "Transcription failed.", None
def convert_pcm_to_mp3(pcm_data):
"""Converts PCM audio to base64 encoded MP3."""
try:
# Create a WAV in memory first
wav_buffer = io.BytesIO()
with wave.open(wav_buffer, 'wb') as wav_file:
wav_file.setnchannels(1) # mono
wav_file.setsampwidth(2) # 16-bit
wav_file.setframerate(24000) # 24kHz
wav_file.writeframes(pcm_data)
wav_buffer.seek(0)
audio_segment = AudioSegment.from_wav(wav_buffer)
mp3_buffer = io.BytesIO()
audio_segment.export(mp3_buffer, format="mp3", codec="libmp3lame")
# Convert to base64
mp3_base64 = base64.b64encode(mp3_buffer.getvalue()).decode('utf-8')
return mp3_base64
except Exception as e:
print(f"Error converting PCM to MP3: {e}")
return None
The two supporting functions, the convert_pcm_to_mp3()
, are used, as its name suggests, to convert the model response audio to MP3 because the Gemini 1.5 audio input does not support PCM format. Then called by the transcribe_audio()
function, it will generate the transcript of the speech by talking to the legacy google-generativeai
API.
async def main() -> None:
async with websockets.serve(gemini_session_handler, "localhost", 9083):
print("Running websocket server localhost:9083...")
await asyncio.Future() # Keep the server running indefinitely
if __name__ == "__main__":
asyncio.run(main())
Finally, the main function is very straightforward, it sets up the WebSocket server to listen on a specific port. For each new connection, gemini_session_handler()
is called to establish a session
, then, because the asyncio.Future()
is being awaited, the server remains running indefinitely.
Client Design
Now, let’s move on to the client-side code written in HTML and JavaScript. The complete code can be downloaded from my GitHub repository.
Moving over to the HTML part, we have a simple layout with image upload, drawing controls, and a canvas that shows the image. And also a transcript text area to display the transcription result.
async function renderFileOnCanvas(file) {
if (file.type.startsWith("image/")) {
const reader = new FileReader();
reader.onload = (event) => {
const img = new Image();
img.onload = () => {
canvas.width = img.width; // Update Canvas width
canvas.height = img.height; // Update Canvas height
context.drawImage(img, 0, 0, canvas.width, canvas.height);
};
img.src = event.target.result;
};
reader.readAsDataURL(file);
}
}
In the Javascript part, the renderFileOnCanvas()
function is responsible for loading the selected image onto the canvas, it handles all of the steps necessary to get it displayed correctly inside the canvas, dynamically updating the canvas size so the images don’t get skewed.
imageLoader.addEventListener('change', async (e) => {
const file = e.target.files[0];
if (file) {
await renderFileOnCanvas(file);
}
});
This listener triggers the renderFileOnCanvas
function to draw the loaded image onto the canvas. It also checks the file parameter, and if it exists, it will call the renderFileOnCanvas
function to draw the selected image onto the canvas.
function captureImage() {
const imageData = canvas.toDataURL("image/jpeg").split(",")[1].trim();
currentFrameB64 = imageData;
}
window.addEventListener("load", async () => {
//await startWebcam();
setInterval(captureImage, 3000);
await initializeAudioContext();
connect();
});
The captureImage()
function is in charge of converting the content of the canvas into base64 so it can be sent to the server along with the audio input in the period setting of 3 seconds for both.
Finally, the startAudioInput()
and stopAudioInput()
functions manage microphone access, capture audio, and send it to the backend via WebSocket. We have already shown these in the previous tutorial. The receiveMessage()
function manages all incoming messages by displaying the text output or playing the audio output, as shown in the previous tutorial.
function receiveMessage(event) {
const messageData = JSON.parse(event.data);
const response = new Response(messageData);
if(response.text){
displayMessage("GEMINI: " + response.text);
}
if(response.audioData){
injestAudioChuckToPlay(response.audioData);
}
}
Those are the key parts of the code.
Now, let’s run the code.
Run the App
The procedure to bring up the web app is still the same.
Start the server by running the Python file. The WebSocket server will be running on port 8093, as defined in the code.
The HTML/JS source code (index.html
, pcm-processor.js
) can be downloaded from my GitHub Repository.
Start the client by running the command under your project folder :
python -m http.server
Now, we can access the local server on port 8000. Type the URL HTTP://localhost:8000 on the browser to try the app.
On the webpage, you can upload an image file and then click the talk button to chat with the Gemini 2.0 with your real-time drawing on the image canvas. Here is the video that I captured for the experience.
Thanks for reading. If you think it’s helpful, please Clap 👏 for this article. Your encouragement and comments mean a lot to me, mentally and financially. 🍔
Before you go:
✍️ If you have any questions, please leave me responses or find me on Xand Discord, where you can have my active support on development and deployment.
☕️ If you want exclusive resources and technical services, subscribing to the services on my Ko-fi will be a good choice.
💯 I am also open to being hired for any innovative and full-stack development jobs.