Complex SQL Joins with LangGraph and Waii
In the rapidly evolving landscape of data analytics, the ability to interact with data through natural language is becoming increasingly valuable. Conversational analytics aims to make complex data structures more accessible to users without specialized technical skills.
LangGraph is a framework for building stateful, multi-agent applications using language models. Waii provides text-to-SQL and text-to-chart capabilities, enabling natural language interactions with databases and data visualization.
This post explores how Waii’s capabilities can enhance LangGraph applications for conversational analytics. We’ll focus on Waii’s approach to handling complex joins in databases, a critical aspect of generating accurate SQL from natural language queries.
Waii’s Text-to-SQL Capabilities
At the core of conversational analytics is the ability to translate natural language into database operations. Waii offers a comprehensive text-to-SQL solution that stands out in several key areas:
- High-accuracy joins across complex schemas
- Scalable table selection for large databases
- Bespoke compiler for syntactic correctness and query optimization
- Specialized agentic flows for filters, sort order, common metrics, etc
In the following section, we’ll take a deep dive into how Waii tackles complex joins. We focus on this because it’s a table-stakes capability for conversational analytics that many of today’s solutions struggle with. We will examine an example, see how the joins are constructed, and explain how you can easily integrate Waii into your existing LangGraph application to achieve these gains.
Deep Dive: Join Handling
An example
Imagine a streaming platform’s data team is tasked with creating a comprehensive director performance dashboard. They need to analyze what makes directors successful by combining data from movies, TV series, genres, keywords, awards, and actor collaborations.
The Instructions
Create a view that provides the following information for the top 5 directors (by the highest number of titles):
- Director’s name
- Total number of titles
- Most frequent genre
- Most frequent keyword
- Number of awards won
- Total revenue from movies
- List of actors they’ve worked with
The Query
The full query generated by Waii from these instructions can be found in Appendix A. Here’s a small fragment, showing some of the joins:
...
FROM ranked_directors AS rd
INNER JOIN movie_db.movies_and_tv.people AS p
ON rd.people_id = p.people_id
LEFT JOIN combined_director_genres AS cdg
ON rd.people_id = cdg.people_id AND cdg.genre_rank = 1
LEFT JOIN combined_director_keywords AS cdk
ON rd.people_id = cdk.people_id AND cdk.keyword_rank = 1
LEFT JOIN director_awards AS da
ON rd.people_id = da.people_id
LEFT JOIN director_revenue AS dr
ON rd.people_id = dr.people_id
LEFT JOIN director_actors AS d_actors
ON rd.people_id = d_actors.people_id
...
Query analysis
This query showcases a number of complex join capabilities:
- Complex Join Graph: 14 tables used in the query with different qualifiers, arity and semantics.
- Bridge Table Joins: Used to connect entities in many-to-many relationships (e.g., directors to movies, TV series, and actors).
- Dimension Table Joins: Utilized to enrich the data with descriptive information from genres and keywords tables.
- Complex Join Chains: Implemented to connect distant entities, such as linking directors to actors through their collaborative work.
- Full Outer Joins: Employed to combine a director’s work across both movies and TV series, ensuring comprehensive coverage.
- Left Joins for Optional Data: Applied when including data that might not exist for all directors (e.g., awards, revenue).
(This list is not exhaustive, there are many other considerations for accurate join handling, such as: Dfference between on and where clause, join order, non-equi joins, lateral joins for semi-structured data, etc)
The key to Waii approach to understanding and representing database relationships. Here’s how it works:
Knowledge Graph Construction
Waii automatically builds a comprehensive knowledge graph of the database objects. This graph incorporates information from multiple sources:
- Schema information
- Constraints (e.g., primary keys / foreign keys)
- Predictions based on analyzing column names and data patterns
- Join graphs extracted and ranked from query history
- Database documentation
- Relationships defined in data catalogs
- Feedback from system usage over time
This graph is continuously updated and refined. Every schema change, new query, and new piece of feedback is analyzed and integrated into the graph.
Agentic Flows for Query Construction
With the knowledge graph in place, Waii employs a sequence of agentic flows to construct the optimal query:
1. Table Selection: Analyzing the user’s request to identify the most relevant tables. Common join relationships and understanding of the semantics of the relationships are used to find tables and paths that might not be directly semantically linked to the user input.
2. Join Graph Analysis: Proposing and evaluating potential join paths between selected tables. This includes scoring how well the join graph is aligned with previously seen joins and the semantic understanding of the relationships.
3. Evaluation/refinement of the join conditions: Separate check to make sure outer joins and join conditions are applied correctly. This is where we also look at “on” vs “where” clause conditions for outer joins.
3. Query Construction: Building the SQL query based on the chosen join graph, and conditions.
4. Compilation and Optimization: Ensuring the joins are syntactically correct and optimized for performance. We also enforce operational constraints placed by the user on the query (e.g., max output rows, max input partitions).
The result is a SQL query that not only answers the user’s question accurately but does so in a way that’s optimized for the specific database structure and query engine.
Building a Conversational Analytics Application
Now that we understand how Waii handles joins and text-to-SQL, let’s explore how we can leverage this capability in combination with LangGraph to build a sophisticated conversational analytics application.
LangGraph is the de facto framework for building agentic systems. For any LangGraph application needing precise, thoughtful database access, Waii is a great addition. Integrating Waii with LangGraph allows developers to create systems that execute complex queries while maintaining context across interactions, boosting the application’s overall intelligence.
Implementation Details
Implementing this system involves several key components:
1. LangGraph Framework: Provides the overall structure for the multi-agent system, managing state and agent interactions.
2. Waii API Integration: The SQL Generation and Visualization Agents would make calls to Waii’s APIs to leverage its text-to-SQL and text-to-chart capabilities.
3. Natural Language Processing: For understanding user inputs and generating human-readable responses.
4. Waii Execution API: To execute the generated SQL queries against the actual database. Code is injected to enforce user-level security policies such as limiting row / column access.
5. State Management: To maintain context across multiple user interactions, allowing for follow-up questions and iterative analysis.
The flow of a typical interaction might look like this:
- User inputs a question.
- An LangGraph Question Classifier decides if the request is best answered from memory or from database
- [optional] The Waii SQL Generator creates an optimized SQL query.
- [optional] The Waii SQL Executor injects security constraints, executes the query, and retrieves results.
- [optional] A Result classifier decides if the output should be data or visualization.
- [optional] The Waii Chart Generator creates relevant charts from the data and metadata.
- A LangGraph Insight Generation Agent synthesizes the final results for the user
- The loop repeats.
(Optional / not shown in image: On error or exception, LangGraph loops, rewrites input and regenerates the required objects.)
Throughout this process, the Conversation Management Agent maintains state, allowing for contextual follow-up questions and a more natural, flowing interaction.
The full implementation of the example is given in Appendix B.
Benefits and Use Cases
This integration of LangGraph and Waii for database access offers several key benefits:
- Accessibility: Complex data analysis becomes accessible to non-technical users through natural language interaction.
- Depth of Analysis: The system can handle intricate queries that would be challenging to formulate manually.
- Contextual Understanding: Maintained state allows for more natural, context-aware conversations about data.
- Visual Insights: Automatic generation of relevant visualizations enhances understanding of the data.
- Scalability: The system can adapt to large, complex databases without a proportional increase in complexity for the end-user.
Potential use cases span various industries:
- Business Intelligence: Executives can query complex business data without needing to learn SQL or BI tools.
- Healthcare: Researchers can explore large medical databases, correlating diverse factors in patient outcomes.
- Finance: Analysts can quickly investigate market trends and company performance across multiple dimensions.
- E-commerce: Marketing teams can analyze customer behavior patterns to inform strategy.
- Education: Administrators can gain insights into student performance and resource allocation.
Conclusion
The combination of LangGraph’s multi-agent capabilities and Waii’s advanced text-to-SQL and visualization features opens up new opportunities in analytics and data processing. By making complex data analysis accessible through natural language, this approach dramatically lowers the barrier to high-quality insights from data.
We’d love to hear from you: How are you addressing these challenges today? What applications are you building with these capabilities?
Appendix A: Query
The full SQL query looks like this:
WITH director_movie_count AS (
SELECT
mdb.people_id,
COUNT(m.movie_id) AS movie_count
FROM movie_db.movies_and_tv.movies_directors_bridge AS mdb
INNER JOIN movie_db.movies_and_tv.movies AS m
ON mdb.movie_id = m.movie_id
GROUP BY
mdb.people_id
),
director_tv_count AS (
SELECT
tsdb.people_id,
COUNT(ts.tv_series_id) AS tv_count
FROM movie_db.movies_and_tv.tv_series_directors_bridge AS tsdb
INNER JOIN movie_db.movies_and_tv.tv_series AS ts
ON tsdb.tv_series_id = ts.tv_series_id
GROUP BY
tsdb.people_id
),
combined_counts AS (
SELECT
COALESCE(dmc.people_id, dtc.people_id) AS people_id,
COALESCE(dmc.movie_count, 0) + COALESCE(dtc.tv_count, 0) AS total_count
FROM director_movie_count AS dmc
FULL OUTER JOIN director_tv_count AS dtc
ON dmc.people_id = dtc.people_id
),
ranked_directors AS (
SELECT
combined_counts.people_id,
combined_counts.total_count,
RANK() OVER (ORDER BY combined_counts.total_count DESC NULLS LAST) AS rank
FROM combined_counts
),
director_genres AS (
SELECT
rd.people_id,
g.name AS genre_name,
COUNT(*) AS genre_count
FROM ranked_directors AS rd
LEFT JOIN movie_db.movies_and_tv.movies_directors_bridge AS mdb
ON rd.people_id = mdb.people_id
LEFT JOIN movie_db.movies_and_tv.movies_genres_bridge AS mgb
ON mdb.movie_id = mgb.movie_id
LEFT JOIN movie_db.movies_and_tv.genres AS g
ON mgb.id = g.id
GROUP BY
rd.people_id,
g.name
UNION ALL
SELECT
rd.people_id,
g.name AS genre_name,
COUNT(*) AS genre_count
FROM ranked_directors AS rd
LEFT JOIN movie_db.movies_and_tv.tv_series_directors_bridge AS tsdb
ON rd.people_id = tsdb.people_id
LEFT JOIN movie_db.movies_and_tv.tv_series_genres_bridge AS tsgb
ON tsdb.tv_series_id = tsgb.tv_series_id
LEFT JOIN movie_db.movies_and_tv.genres AS g
ON tsgb.id = g.id
GROUP BY
rd.people_id,
g.name
),
combined_director_genres AS (
SELECT
director_genres.people_id,
director_genres.genre_name,
SUM(director_genres.genre_count) AS total_genre_count,
RANK()
OVER (PARTITION BY director_genres.people_id ORDER BY SUM(director_genres.genre_count) DESC NULLS LAST)
AS genre_rank
FROM director_genres
GROUP BY
director_genres.people_id,
director_genres.genre_name
),
director_keywords AS (
SELECT
rd.people_id,
k.name AS keyword_name,
COUNT(*) AS keyword_count
FROM ranked_directors AS rd
LEFT JOIN movie_db.movies_and_tv.movies_directors_bridge AS mdb
ON rd.people_id = mdb.people_id
LEFT JOIN movie_db.movies_and_tv.movies_keywords_bridge AS mkb
ON mdb.movie_id = mkb.movie_id
LEFT JOIN movie_db.movies_and_tv.keywords AS k
ON mkb.id = k.id
GROUP BY
rd.people_id,
k.name
),
combined_director_keywords AS (
SELECT
director_keywords.people_id,
director_keywords.keyword_name,
SUM(director_keywords.keyword_count) AS total_keyword_count,
RANK()
OVER (
PARTITION BY director_keywords.people_id ORDER BY SUM(director_keywords.keyword_count) DESC NULLS LAST
)
AS keyword_rank
FROM director_keywords
GROUP BY
director_keywords.people_id,
director_keywords.keyword_name
),
director_awards AS (
SELECT
pab.people_id,
COUNT(*) AS award_count
FROM movie_db.movies_and_tv.people_awards_bridge AS pab
INNER JOIN movie_db.movies_and_tv.awards AS a
ON pab.award_id = a.award_id
WHERE
a.iswinner = 'True'
GROUP BY
pab.people_id
),
director_revenue AS (
SELECT
mdb.people_id,
SUM(m.revenue) AS total_revenue
FROM movie_db.movies_and_tv.movies_directors_bridge AS mdb
INNER JOIN movie_db.movies_and_tv.movies AS m
ON mdb.movie_id = m.movie_id
GROUP BY
mdb.people_id
),
director_actors AS (
SELECT DISTINCT
rd.people_id,
p.name AS actor_name
FROM ranked_directors AS rd
LEFT JOIN movie_db.movies_and_tv.movies_directors_bridge AS mdb
ON rd.people_id = mdb.people_id
LEFT JOIN movie_db.movies_and_tv.movies_actors_bridge AS mab
ON mdb.movie_id = mab.movie_id
LEFT JOIN movie_db.movies_and_tv.people AS p
ON mab.people_id = p.people_id
UNION
SELECT DISTINCT
rd.people_id,
p.name AS actor_name
FROM ranked_directors AS rd
LEFT JOIN movie_db.movies_and_tv.tv_series_directors_bridge AS tsdb
ON rd.people_id = tsdb.people_id
LEFT JOIN movie_db.movies_and_tv.tv_series_actors_bridge AS tsab
ON tsdb.tv_series_id = tsab.tv_series_id
LEFT JOIN movie_db.movies_and_tv.people AS p
ON tsab.people_id = p.people_id
)
SELECT
p.name,
rd.total_count AS number_of_titles,
ARRAY_AGG(DISTINCT cdg.genre_name) AS most_frequent_genres,
ARRAY_AGG(DISTINCT cdk.keyword_name) AS most_frequent_keywords,
COALESCE(da.award_count, 0) AS award_count,
COALESCE(dr.total_revenue, 0) AS total_revenue,
ARRAY_AGG(DISTINCT d_actors.actor_name) AS actors_worked_with
FROM ranked_directors AS rd
INNER JOIN movie_db.movies_and_tv.people AS p
ON rd.people_id = p.people_id
LEFT JOIN combined_director_genres AS cdg
ON rd.people_id = cdg.people_id AND cdg.genre_rank = 1
LEFT JOIN combined_director_keywords AS cdk
ON rd.people_id = cdk.people_id AND cdk.keyword_rank = 1
LEFT JOIN director_awards AS da
ON rd.people_id = da.people_id
LEFT JOIN director_revenue AS dr
ON rd.people_id = dr.people_id
LEFT JOIN director_actors AS d_actors
ON rd.people_id = d_actors.people_id
WHERE
rd.rank <= 5
GROUP BY
p.name,
rd.total_count,
da.award_count,
dr.total_revenue
ORDER BY
rd.total_count DESC NULLS LAST,
p.name ASC
Appendix B: LangGraph application
Here’s the full LangGraph application (also on github):
import os
import sys
from typing import List, Optional, Dict, Any
import pandas as pd
import plotly
from pydantic import BaseModel
from langgraph.graph import StateGraph
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema import StrOutputParser
from waii_sdk_py import WAII
from waii_sdk_py.query import QueryGenerationRequest, RunQueryRequest
class State(BaseModel):
database_description: str = ''
query: str = ''
sql: str = ''
data: List[Dict[str, Any]] = []
chart: Any = ''
insight: str = ''
response: str = ''
error: Optional[str] = None
path_decision: str = ""
class LanggraphWorkflowManager:
def init_waii(self):
WAII.initialize(url=os.getenv("WAII_URL"), api_key=os.getenv("WAII_API_KEY"))
WAII.Database.activate_connection(os.getenv("DB_CONNECTION"))
def create_workflow(self) -> StateGraph:
workflow = StateGraph(State)
workflow.add_node("Question Classifier", self.question_classifier)
workflow.add_node("Result Classifier", self.result_classifier)
workflow.add_node("SQL Generator", self.sql_generator)
workflow.add_node("SQL Executor", self.sql_executor)
workflow.add_node("Chart Generator", self.chart_gen)
workflow.add_node("Insight Generator", self.insight_generator)
workflow.add_node("Result Synthesizer", self.result_synthesizer)
workflow.set_entry_point("Question Classifier")
workflow.add_conditional_edges(
"Question Classifier",
lambda state: state.path_decision,
{
"database": "SQL Generator",
"visualization": "Chart Generator",
"general": "Insight Generator"
}
)
workflow.add_edge("SQL Generator", "SQL Executor")
workflow.add_edge("SQL Executor", "Result Classifier")
workflow.add_conditional_edges(
"Result Classifier",
lambda state: state.path_decision,
{
"visualization": "Chart Generator",
"data": "Result Synthesizer"
}
)
workflow.add_edge("Chart Generator", "Result Synthesizer")
workflow.add_edge("Insight Generator", "Result Synthesizer")
workflow.add_edge("Result Synthesizer", "Question Classifier")
return workflow
def question_classifier(self, state: State) -> State:
state.database_description = self.format_catalog_info(WAII.Database.get_catalogs())
state.query = input("Question: ")
prompt = ChatPromptTemplate.from_messages([
("human",
"Database info: \n---\n{database_description}\n---\n"
"Answer 'database' if this question is likely related to information in the database. Otherwise answer 'general'? Question: '{query}'. "
"Consider the information you have about the database, when in doubt answer 'database'")
])
chain = prompt | ChatOpenAI() | StrOutputParser()
classification = chain.invoke({"query": state.query, "database_description": state.database_description}).strip().lower()
return state.model_copy(update={"path_decision": classification, "error": None})
def sql_generator(self, state: State) -> State:
sql = WAII.Query.generate(QueryGenerationRequest(ask=state.query)).query
return state.model_copy(update={"sql": sql, "insight":""})
def sql_executor(self, state: State) -> State:
data = WAII.Query.run(RunQueryRequest(query=state.sql)).rows
return state.model_copy(update={"data": data}, deep=True)
def chart_gen(self, state: State) -> State:
df_data = pd.DataFrame(state.data)
chart = WAII.Chart.generate_chart(df=df_data)
return state.model_copy(update={"chart": chart.chart_spec, "error": None}, deep=True)
def result_classifier(self, state: State) -> State:
state.chart = ''
prompt = ChatPromptTemplate.from_messages([
("human",
"Is the following question best answered by 'data' or a 'visualization'? Question: '{query}'. "
"Output: Strictly respond with either 'data', or 'visualization'. No additional text.")
])
chain = prompt | ChatOpenAI() | StrOutputParser()
classification = chain.invoke({"query": state.query}).strip().lower()
return state.model_copy(update={"path_decision": classification, "error": None})
def insight_generator(self, state: State) -> dict:
prompt = ChatPromptTemplate.from_messages([("human", "{query}")])
chain = prompt | ChatOpenAI() | StrOutputParser()
insight = chain.invoke({"query": state.query})
return state.model_copy(update={"insight": insight, "sql": "", "data": [], "error": None}, deep=True)
def result_synthesizer(self, state: State) -> State:
model = ChatOpenAI()
prompt = ChatPromptTemplate.from_messages([
("system", "You are an expert assistant in analyzing data"),
("human", "\n User Question: '{query}'. "
"\n Results of query (if any): '{data}'."
"\n LLM results (if any): '{insight}'."
"\n\n Instructions: Answer the user with this information.")
])
chain = prompt | model | StrOutputParser()
data = "\n".join(" | ".join(f"{key}: {value}" for key, value in row.items()) for row in state.data)
output = chain.invoke({"query": state.query, "data": data, "insight": state.insight}).strip().lower()
if state.chart:
df = pd.DataFrame(state.data)
exec(state.chart.plot)
print('Answer: '+output)
return state.model_copy(update={"response": output}, deep=True)
def __init__(self):
self.workflow = self.create_workflow()
self.app = self.workflow.compile()
self.init_waii()
print(self.app.get_graph().draw_ascii())
def format_catalog_info(self, catalogs):
return "\n".join([
f"Database: {catalog.name}\n" +
"\n".join([
f" Schema: {schema.name.schema_name}\n Description: {schema.description}"
for schema in catalog.schemas
]) + "\n"
for catalog in catalogs.catalogs
])
def run_workflow(self):
while True:
try:
initial_state = State()
app_response = self.app.invoke(initial_state)
except Exception as e:
print(f"Error in workflow: {e}. Will restart.")
LanggraphWorkflowManager().run_workflow()