GCP – Unleashing the Power of AI: Automating SQL Query Generation and Real-Time Data Streaming with Confluent and Google Cloud
In today’s data-driven world, companies need real-time insights from vast datasets to optimize supply chains, predict demand fluctuations, and improve patient outcomes. However, extracting these insights often involves complex SQL queries and intricate data pipelines. This is where the power of large language models (LLMs) and real-time data streaming converge, to enhance data analytics.
This blog explores how to harness LLMs to automate SQL query generation, streamlining data analysis workflows. We’ll delve into how to integrate LLMs with Confluent and Vertex AI to create a powerful, end-to-end solution for real-time data processing and insights. This integration can be useful for the following use cases:
Accelerated data exploration: LLMs can generate SQL queries based on natural language prompts, enabling users to quickly explore datasets without SQL expertise.
Automated report generation: By understanding data schema and business requirements, LLMs can generate complex SQL queries for report generation, saving time and reducing errors.
Data pipeline optimization: LLMs can analyze existing SQL queries and suggest improvements, leading to more efficient data processing pipelines.
Anomaly detection: By identifying patterns in historical data, LLMs can generate SQL queries to detect anomalies and outliers.
In addition, integrating LLMs with Confluent and Vertex AI helps resolve numerous challenges and problems in the data analytics space:
Writing complex SQL queries: Writing and optimizing complex SQL queries can be time-consuming and error-prone, requiring specialized data engineering skills.
Real-time Data analysis on real-time data: Traditional batch processing methods often lack the speed and agility needed for real-time decision-making.
Data silos: Disparate data sources and formats hinder a holistic view of business operations.
To address these challenges and maximize the benefits of LLMs, Confluent and Google Cloud Vertex AI play essential roles:
Automated SQL generation: LLMs on Vertex AI/Gemini translate natural language requests into efficient SQL queries, empowering business users to access data without specialized skills.
Real-time data streaming: Confluent facilitates continuous data ingestion, processing, and delivery, ensuring that insights are readily available for immediate action.
Unified data platform: Integrating Confluent with Google Cloud services like BigQuery (or Cloud SQL) creates a centralized data platform, breaking down data silos and providing a comprehensive view.
By combining these technologies, organizations can create robust and scalable solutions for automating SQL query generation.
In the next section, we’ll dive deeper into the technical implementation and explore how to integrate these technologies.
Solution details
In the following demo, we take a look at COVID data and utilize Google’s Speech-to-Text to convert spoken queries into text, setting the stage for a sophisticated data processing workflow. Confluent Cloud’s FlinkAI is specifically utilized to manage calls to Google’s remote inference engine, Gemini, providing efficient and timely processing of SQL queries that were generated from user inputs. This integration within Confluent’s microservices architecture, facilitated by Apache Kafka for real-time data streaming, delivers highly integrated communication and data flow between services. Then, once the data has been retrieved and processed, Gemini summarizes the findings into clear, insightful summaries that are then converted back into natural-sounding speech using Google’s text-to-speech. This robust system, combining Google’s AI capabilities with the specialized function of FlinkAI within Confluent Cloud, is a streamlined approach to delivering fast, accurate, and accessible data-driven insights through intuitive voice commands. This solution not only demonstrates the power of voice and AI integration but also opens up new possibilities for making data-driven insights more accessible to everyone.
How it works:
Voice input: Users interact with a user-friendly voice-enabled interface, articulating their data queries in natural language, such as requesting, “Give me all COVID-19 cases in France in 2021.”
Speech-to-text: Google Speech-to-Text service converts the spoken input into text.
SQL query formation: The text is processed by Gemini to generate an SQL query.
Query execution: The SQL query is executed to fetch the relevant data.
Data summarization: Gemini then summarizes the retrieved data into a concise format.
Text-to-speech: A text-to-speech service converts the summarized text back into natural-sounding speech, which is delivered to the user.
The diagram above shows the overall flow from the user – in natural language – to the AI models and back to the user.
The solution implements the following features:
Natural language interface: Users can interact with data using simple, intuitive language, eliminating the need for SQL expertise.
Automated query optimization: Using Gemini, Vertex AI leverages its knowledge of data structures and query patterns to generate efficient queries, optimizing performance.
Real-time data pipelines: Confluent’s streaming capabilities provide insights with minimal latency, enabling proactive decision-making.
Scalability and security: The solution leverages the scalability and security of Google Cloud, ensuring data integrity and compliance (with healthcare regulations).
Let’s take a deeper look at the various components.
1. Speech to Text
The initial phase of our interactive data processing system begins with converting speech to text. Utilizing a KStream application, the solution handles audio inputs where each audio file is processed to extract textual queries. This process involves the AudioProcessor class which, upon receiving an audio file, leverages Google’s Speech-to-Text API to perform accurate and fast speech recognition. Once the audio is processed, the resulting text is encapsulated as an SQLRequest, which contains both the query and session information. This SQLRequest is then forwarded to another topic within our streaming architecture, setting the stage for subsequent SQL generation and data retrieval steps. This transition from audio to text helps ensure that user queries are quickly and accurately converted into actionable database queries, ready for deeper analysis and response generation.
<ListValue: [StructValue([(‘code’, ‘@Slf4jrnpublic class AudioProcessor implements Processor<String, AudioQuery, String, SQLRequest> {rnrn private ProcessorContext<String, SQLRequest> context;rnrn @Overridern public void init(ProcessorContext<String, SQLRequest> context) {rn this.context = context;rn }rnrn @Overridern public void process(Record<String, AudioQuery> record) {rn final AudioQuery audioQuery = record.value();rnrn SpeechSettings settings;rn try {rn settings = SpeechSettings.newBuilder().setEndpoint(“speech.googleapis.com:443”).build();rn } catch (IOException e) {rn log.error(“Error creating SpeechSettings.”, e);rn throw new RuntimeException(e);rn }rnrn try (SpeechClient speechClient = SpeechClient.create(settings)) {rn final byte[] audioFile = audioQuery.getAudio();rnrn log.info(“Processing audio for session id: {}”, audioQuery.getSessionId());rnrn // Builds the sync recognize requestrn RecognitionConfig config =rn RecognitionConfig.newBuilder()rn .setEncoding(RecognitionConfig.AudioEncoding.WEBM_OPUS)rn .setLanguageCode(“en-US”)rn .build();rn RecognitionAudio audio = RecognitionAudio.newBuilder()rn .setContent(ByteString.copyFrom(audioFile))rn .build();rn RecognizeResponse response = speechClient.recognize(config, audio);rn List<SpeechRecognitionResult> results = response.getResultsList();rn if (results.isEmpty()) {rn log.info(“No results found for session id: {}”, audioQuery.getSessionId());rn return;rn }rnrn final String query = results.get(0).getAlternatives(0).getTranscript();rn final SQLRequest sqlRequest = new SQLRequest(query, audioQuery.getSessionId());rn context.forward(new Record<>(audioQuery.getSessionId(), sqlRequest, record.timestamp()));rnrn log.info(“Results: {}”, results);rnrn } catch (Exception e) {rn log.error(“Error processing audio for session id: {}”, audioQuery.getSessionId(), e);rn throw new RuntimeException(e);rn }rn }rn}’), (‘language’, ”), (‘caption’, <wagtail.rich_text.RichText object at 0x3e702ceca640>)])]>
2. Human query to SQL
A standout feature of our demo is the sqlgenerator model, which turns spoken queries into precise SQL commands. This model’s capability hinges on its sophisticated prompt system, designed to handle complex natural language inputs. The prompt details the database schema, guiding the AI to generate SQL queries that are accurate and contextually aware. For example, when a user asks for the total number of COVID-19 tests and first-dose vaccinations for each country in the latest week, the model constructs a query based on an intricate understanding of database structures and relationships, as described in the prompt. This involves parsing and translating diverse data types and table relationships into a cohesive SQL command. Additionally, it outputs a comprehensive JSON description that elucidates the query’s purpose and the data schema it impacts. This intricate prompt design underscores our innovative approach, enabling complex database queries through straightforward voice commands.
<ListValue: [StructValue([(‘code’, ‘CREATE MODEL sqlgeneratorrnINPUT(query STRING)rnOUTPUT(`sql` STRING)rnCOMMENT ‘Human to SQL’rnWITH (rn ‘provider’ = ‘googleai’,rn ‘task’ = ‘text_generation’,rn ‘googleai.endpoint’ = ‘https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-pro-latest:generateContent’,rn ‘googleai.PARAMS.top_p’ = ‘0.95’,rn ‘googleai.PARAMS.top_k’ = ’64’,rn ‘googleai.PARAMS.temperature’ = ‘1’,rn ‘googleai.api_key’ = ‘{{sessionconfig/sql.secrets.gcp_key}}’,rn ‘googleai.system_prompt’ = ‘Given the database schema and structures provided below, generate a Postgres-compatible SQL query to select the required data, and provide a detailed description in JSON format about the query, the data schema, and data types.rnrnHere is the schema:rnrnTable 1: `vaccine_data.covid_testing_data`rn- `country`: character varying(255)rn- `country_code`: character varying(10)rn- `year_week`: character varying(20)rn…rnrnTable 2: `vaccine_data.covid_vaccination_data`rn- `reportingcountry`: character varying(255)rn- `denominator`: double precisionrn…rnrnTable 3: `vaccine_data.hospital_occupancy`rn- `indicator`: text NOT NULLrn- `date`: text NOT NULLrn…rnrnAn example request for data:rn”Please provide the total number of tests done and the total first-dose vaccinations for each country in the latest week available.”rnrnExpected SQL Query:rn“`sqlrnSELECTrn t.country,rn MAX(t.year_week) AS latest_week,rn SUM(t.tests_done) AS total_tests_done,rn SUM(v.firstdose) AS total_first_dose_vaccinationsrnFROMrn vaccine_data.covid_testing_data trnJOINrn vaccine_data.covid_vaccination_data vrnONrn t.country = v.reportingcountryrn AND t.year_week = v.year_weekrnGROUP BYrn t.country;rn“`rnrnExpected JSON output:rn“`jsonrn{rn “query_description”: “This query retrieves the total number of tests done and the total number of first-dose vaccinations for each country in the latest week available from the respective tables.”,rn “schema”: {rn “country”: “character varying(255)”,rn “latest_week”: “character varying(20)”,rn “total_tests_done”: “integer”,rn “total_first_dose_vaccinations”: “integer”rn },rn “sql_query”: “SELECT t.country, MAX(t.year_week) AS latest_week, SUM(t.tests_done) AS total_tests_done, SUM(v.firstdose) AS total_first_dose_vaccinations FROM vaccine_data.covid_testing_data t JOIN vaccine_data.covid_vaccination_data v ON t.country = v.reportingcountry AND t.year_week = v.year_week GROUP BY t.country;”rn}rn“`rnrnHuman: ‘rn);’), (‘language’, ”), (‘caption’, <wagtail.rich_text.RichText object at 0x3e702ceca700>)])]>
<ListValue: [StructValue([(‘code’, ‘{rn “sessionId”: “385fa7dc-0095-0c1a-d7bf-9654bf135ed6”,rn “query”: “how many coid 19 in 2021 for France”,rn “sql”: ““`json\n{\n \”schema\”: {\n \”new_cases\”: \”INT\”\n },\n \”sql\”: \”SELECT SUM(new_cases) AS new_cases FROM vaccine_data.covid_testing_data WHERE country = ‘France’ AND year_week LIKE ‘2021%’;\”,\n \”description\”: \”Total number of new COVID-19 cases in France for the year 2021.\”\n}\n“`”rn}’), (‘language’, ”), (‘caption’, <wagtail.rich_text.RichText object at 0x3e702ceca9d0>)])]>
3. Query execution
Following the generation of the SQL query by the sqlgenerator model, the subsequent step is its execution, which retrieves the desired data from the databases. Once the data is obtained, the system formats the results into a markdown table. This structuring is crucial as it prepares the data for further processing, tailoring it for readability and further analysis. The formatted table is then processed by another call to the Gemini inference engine, which creates a concise, human-readable summary that can be presented verbally.
<ListValue: [StructValue([(‘code’, ‘{rn “sessionId”:”bb76112c-3900-121d-d365-12bee9de421b”,rn “executedQuery”:”SELECT * FROM vaccine_data.covid_testing_data WHERE country = ‘France’ AND year_week LIKE ‘2021%’;”,rn “renderedResult”:”| country | country_code | year_week | level | region | region_name | new_cases | tests_done | population | testing_rate | positivity_rate | testing_data_source |\n| ——- | ———— | ——— | ——– | —— | ———– | ——— | ———- | ———- | —————— | ——————- | ——————- |\n| France | FR | 2021-W43 | national | FR | France | 20604 | 3374137 | 67391582 | 50.06763307619044 | 0.6106450330854971 | TESSy COVID-19 |\n| France | FR | 2021-W43 | national | FR | France | 21668 | 3047258 | 67391582 | 45.217190479368774 | 0.7110654890396546 | TESSy COVID-19 |\n| France | FR | 2021-W43 | national | FR | France | 30112 | 2408920 | 67391582 | 35.745117246245975 | 1.2500207561894956 | TESSy COVID-19 |\n| France | FR | 2021-W43 | national | FR | France | 25332 | 2277710 | 67391582 | 33.79813817102558 | 1.1121696791953322 | TESSy COVID-19 |\n| France | FR | 2021-W43 | national | FR | France | 36249 | 4897647 | 67391582 | 72.6744625166983 | 0.7401309240947745 | TESSy COVID-19 |\n| France | FR | 2021-W43 | national | FR | France | 32067 | 3014996 | 67391582 | 44.73846600010072 | 1.0635835006082928 | TESSy COVID-19 |\n| France | FR | 2021-W43 | national | FR | France | 30575 | 2250466 | 67391582 | 33.39387403014222 | 1.3586075061787204 | TESSy COVID-19 |\n| France | FR | 2021-W43 | national | FR | France | 43273 | 2802684 | 67391582 | 41.588042850811846 | 1.543984266510245 | TESSy COVID-19 |\n| France | FR | 2021-W43 | national | FR | France | 17001 | 3704239 | 67391582 | 54.965900637263566 | 0.4589606664148831 | TESSy COVID-19 |\n| France | FR | 2021-W43 | national | FR | France | 36215 | 2197339 | 67391582 | 32.605541149041436 | 1.6481298516068756 | TESSy COVID-19 |\n| France | FR | 2021-W43 | national | FR | France | 41156 | 4780081 | 67391582 | 70.92994196218751 | 0.8609895941093885 | TESSy COVID-19 |\n| France | FR | 2021-W43 | national | FR | France | 12976 | 4880462 | 67391582 | 72.41946034150082 | 0.26587646825239086 | TESSy COVID-19 |\n| France | FR | 2021-W43 | national | FR | France | 40344 | 4867416 | 67391582 | 72.22587533261944 | 0.8288586798416243 | TESSy COVID-19 |\n| France | FR | 2021-W43 | national | FR | France | 46205 | 2474580 | 67391582 | 36.719422909525996 | 1.8671855425971278 | TESSy COVID-19 |\n| France | FR | 2021-W43 | national | FR | France | 43638 | 2268119 | 67391582 | 33.65582069285745 | 1.923973124866905 | TESSy COVID-19 |\n| France | FR | 2021-W43 | national | FR | France | 27657 | 3130413 | 67391582 | 46.451098298894365 | 0.8834936476432982 | TESSy COVID-19 |\n| France | FR | 2021-W43 | national | FR | France | 39714 | 3542094 | 67391582 | 52.559887969390594 | 1.121201187771979 | TESSy COVID-19 |\n| France | FR | 2021-W43 | national | FR | France | 13068 | 2071816 | 67391582 | 30.742949468080447 | 0.6307509933314541 | TESSy COVID-19 |\n| France | FR | 2021-W43 | national | FR | France | 39389 | 1600183 | 67391582 | 23.744553140182997 | 2.461530962396176 | TESSy COVID-19 |\n| France | FR | 2021-W43 | national | FR | France | 17297 | 2863145 | 67391582 | 42.485202380321034 | 0.6041258825522284 | TESSy COVID-19 |”,rn “description”:”This query returns all COVID-19 testing data for France in 2021, including new cases, tests done, and positivity rate.”,rn “query”:”give me all the coid 19 in France in 2021″rn}’), (‘language’, ”), (‘caption’, <wagtail.rich_text.RichText object at 0x3e702ceca1f0>)])]>
4. Table summarization
Building upon the data retrieved and organized into a markdown table, the system calls the “sqlsummary” model, powered by FlinkAI. This model is tasked with generating an easily understandable summary of the table’s data, tailored for audio delivery. It uses a sophisticated prompt mechanism, crucial for directing the AI’s text-generation capabilities. The prompt specifies that the summary should not only recite the data but also provide a coherent narrative about its context, trends, and significant points such as highs and lows.
<ListValue: [StructValue([(‘code’, “CREATE MODEL sqlsummaryrnINPUT(renderedResult STRING)rnOUTPUT(response STRING)rnCOMMENT ‘SQL Summary’rnWITH (rn ‘provider’ = ‘googleai’,rn ‘task’ = ‘text_generation’,rn ‘googleai.endpoint’ = ‘https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-pro-latest:generateContent’,rn ‘googleai.PARAMS.top_p’ = ‘0.95’,rn ‘googleai.PARAMS.top_k’ = ’64’,rn ‘googleai.PARAMS.temperature’ = ‘1’,rn ‘googleai.api_key’ = ‘{{sessionconfig/sql.secrets.gcp_key}}’,rn ‘googleai.system_prompt’ = ‘Using the information provided in the JSON document, create a detailed summary for the table in the `renderedResult` field. The summary should contain details of the data in the table and be clear, detailed, and suitable for an audio format.rnrnPlease generate a summary that includes the following elements:rn1. A brief overview of the context and source of the data.rn2. Details about ranges and typical values.rn3. Any notable highs and lows in new cases.rn4. General trends that can be observed from the data.rnrnEnsure that the summary is clear, detailed, and presented in a way that makes it suitable for being read out loud.rnrnBelow is the JSON document:’rn);”), (‘language’, ”), (‘caption’, <wagtail.rich_text.RichText object at 0x3e702cecae80>)])]>
<ListValue: [StructValue([(‘code’, ‘{rn “sessionId”:”bb76112c-3900-121d-d365-12bee9de421b”,rn “executedQuery”:”SELECT * FROM vaccine_data.covid_testing_data WHERE country = ‘France’ AND year_week LIKE ‘2021%’;”,rn “response”:”This data comes from the TESSy COVID-19 database and presents the COVID-19 testing data for France during the year 2021, specifically for week 43. \n\nOver this week, the number of new daily cases in France fluctuated. Daily new cases ranged from a low of 12,976 to a high of 46,205. The number of tests conducted each day also varied, ranging from approximately 1.6 million to nearly 4.9 million. This resulted in a testing rate, which represents the number of tests per 100,000 people, ranging from approximately 23.7 to 72.7. \n\nNotably, the positivity rate, which is the percentage of tests that came back positive, saw a significant high of around 2.46 on one of the days. This indicates a higher transmission rate during that time. \n\nOverall, while the testing rate remained relatively stable, the fluctuating number of new cases and positivity rate during week 43 in France suggests an evolving COVID-19 situation. \n”,rn “description”:”This query returns all COVID-19 testing data for France in 2021, including new cases, tests done, and positivity rate.”,rn “query”:”give me all the coid 19 in France in 2021″rn}’), (‘language’, ”), (‘caption’, <wagtail.rich_text.RichText object at 0x3e702dbc5e20>)])]>
5. Text-to-speech
The final stage in the data processing pipeline is the text-to-speech conversion, where the summarized text generated by the system is transformed into audible speech. This is accomplished using a KStream application, which takes the prepared text summaries and processes them through a text-to-speech service. This service is configured to deliver high-quality audio output that captures the essence of the data in a clear and engaging manner. The application helps ensure that the speech output is free of any formatting remnants from the text summary, for a clean and professional listening experience. Once the conversion is complete, the audio is then pushed to another topic within the system, to make it accessible for further use.
<ListValue: [StructValue([(‘code’, ‘/**rn * Processor that converts text to speech.rn */rn@Slf4jrnpublic class TextProcessor implements Processor<String, SQLResponse, String, byte[]> {rnrn private ProcessorContext<String, byte[]> context;rn private final TextContentRenderer renderer = TextContentRenderer.builder().build();rn private final Parser parser = Parser.builder().build();rnrn @Overridern public void init(ProcessorContext<String, byte[]> context) {rn this.context = context;rn }rnrn @Overridern public void process(Record<String, SQLResponse> record) {rn final SQLResponse sqlResponse = record.value();rnrn log.info(“Processing text for session id: {}”, sqlResponse.getSessionId());rnrn final TextToSpeechSettings settings;rn try {rn settings = TextToSpeechSettings.newBuilder().setEndpoint(“texttospeech.googleapis.com:443”).build();rn } catch (IOException e) {rn log.error(“Error creating TextToSpeechSettings.”, e);rn throw new RuntimeException(e);rn }rnrn try (TextToSpeechClient textToSpeechClient = TextToSpeechClient.create(settings)) {rn // Make sure the summary doesn’t include any markdown formattingrn Node document = parser.parse(sqlResponse.getResponse());rn final String renderedText = renderer.render(document);rnrn SynthesizeSpeechRequest request =rn SynthesizeSpeechRequest.newBuilder()rn .setInput(SynthesisInput.rn newBuilder()rn .setText(renderedText)rn .build())rn .setVoice(VoiceSelectionParams.newBuilder()rn .setLanguageCode(“en-US”)rn .setSsmlGender(SsmlVoiceGender.FEMALE)rn .build())rn .setAudioConfig(AudioConfig.newBuilder()rn .setAudioEncoding(AudioEncoding.MP3)rn .addEffectsProfileId(“telephony-class-application”)rn .build())rn .build();rnrn // Call Text-to-Speech APIrn SynthesizeSpeechResponse response = textToSpeechClient.synthesizeSpeech(request);rn final byte[] audio = response.getAudioContent().toByteArray();rnrn // Now push the audio response to the output topic rn context.forward(new Record<>(sqlResponse.getSessionId(), audio, record.timestamp()));rn } catch (Exception e) {rn log.error(“Error processing text for session id: {}”, sqlResponse.getSessionId(), e);rn throw new RuntimeException(e);rn }rn }rn}’), (‘language’, ”), (‘caption’, <wagtail.rich_text.RichText object at 0x3e702dbc5b50>)])]>
Outcomes and benefits
By leveraging LLMs with Confluent and Vertex AI, organizations unlock the full potential of their data and gain a competitive advantage, achieving:
Increased data accessibility: Empowering non-technical users to explore data through natural language
Improved data analysis efficiency: Automating routine SQL tasks, freeing up analyst time for higher-value activities
Enhanced data quality: Identifying and correcting errors in SQL queries through LLM analysis
Faster time-to-insights: Accelerating data exploration and analysis processes
Cost reduction: Optimizing query performance and reducing reliance on SQL experts
Confluent and Google Cloud: A powerful combination for AI-driven data
Together, Confluent and Google Cloud partner to help organizations harness the full potential of their data. By combining Confluent’s real-time data-streaming capabilities with Google Cloud’s robust infrastructure and AI services, including Vertex AI, businesses can create innovative solutions that drive growth and efficiency.
Key benefits of this collaboration for automating SQL query generation with LLMs include:
Real-time data foundation: Confluent ensures LLMs have access to the freshest data for accurate and relevant query generation.
Scalable AI infrastructure: Vertex AI provides the ideal platform for deploying and managing LLMs.
Data integration and enrichment: Confluent’s connectors and data processing capabilities enable seamless integration of diverse data sources.
Accelerated time-to-insights: By automating SQL query generation, businesses can expedite data exploration and analysis.
Improved decision making: Real-time insights derived from automated queries inform better business decisions.
Through this powerful combination, organizations can break down data silos, optimize data pipelines, and unlock the true value of their data assets.
Ready to unlock the full potential of your data?
Contact us to learn how Confluent and Google Cloud can help you build intelligent, data-driven applications. Begin experimenting with Vertex AI and Confluent Cloud on the Google Cloud Marketplace today!
*FlinkAI is currently in public preview as of this writing.
Read More for the details.