End-to-end Snowpipe Streaming HPA + AI demo for webinars. Sets up Snowpipe Streaming high-performance architecture (HPA) with background data generation, deploys a live Streamlit dashboard, then layers on a Semantic View and Cortex Agent so the presenter can do natural-language queries on live-streaming data in Snowsight. Triggers: snowpipe streaming ai webinar, snowpipe streaming webinar demo, snowpipe streaming ai demo, streaming ai demo, snowpipe streaming webinar, snowpipe streaming cortex agent demo, snowpipe streaming semantic view demo.
76
—
Does it follow best practices?
Impact
—
No eval scenarios have been run
Risky
Do not use without reviewing
Use this skill when the user wants to:
This is not for a quick Snowpipe Streaming HPA tryout (use snowpipe-streaming-quickstart for that). This skill is specifically for a presentation-ready demo that showcases streaming + AI.
A fully automated, presentation-ready demo pipeline:
cortex_analyst_text_to_sql pointed at the semantic viewThe high-performance architecture automatically creates a default pipe when data is first ingested into a table. No CREATE PIPE SQL is required.
The Python SDK references the default pipe using this naming convention:
<TABLE_NAME>-streamingImportant: Use a hyphen (-), not an underscore (_).
Unlike the quickstart (which blocks for N minutes), this skill runs the streaming script in the background for 30 minutes. This lets the presenter immediately move on to building the AI layer while data keeps flowing. The wow factor: re-ask the same question later and the numbers have changed.
Steps 6 and 7 use direct CREATE SEMANTIC VIEW and CREATE AGENT SQL rather than invoking the bundled semantic-view and cortex-agent skills. This is because we know the exact table schema (it's always the same 12 columns) and want maximum speed for a live demo. The semantic view and agent are created in ~2 seconds each instead of minutes.
IMPORTANT EXECUTION GUIDELINES:
Announce each step clearly — Before executing each step, print a clear header like "Step X -- [Step Name]" so the user knows exactly where they are.
Batch commands aggressively to minimize permission prompts — Same batching rules as the quickstart:
openssl genrsa... and the pubkey extraction in one single Bash callprofile.json and streaming_demo.py as two parallel FileWrite callsrun_in_background=true) as three parallel tool callsUse parallel tool calls whenever operations are independent.
Log all SQL to streaming_demo_sql.log — Every SQL execution gets appended to a local log file with step headers and timestamps.
Do NOT ask about demo duration — This skill always runs streaming for 30 minutes in the background. Skip that question from the quickstart preferences.
Before doing anything, confirm the user wants to run the full webinar demo:
"This will run the Snowpipe Streaming AI Webinar Demo -- a fully automated pipeline that:
- Creates a demo database, schema, table, user, and role in Snowflake
- Generates RSA keys and a Python virtual environment locally
- Deploys a live Streamlit dashboard to monitor data in real-time
- Starts streaming fake user data in the background (runs for 30 min)
- Creates a Semantic View on the streaming data for natural-language querying
- Creates a Cortex Agent you can interact with in Snowsight
Setup takes ~5 minutes. After that, you drive the live demo in Snowsight. Want to proceed?"
If the user says no, stop gracefully.
Purpose: Verify system requirements and understand the current Snowflake context.
Execute these two tool calls in parallel:
Tool call 1 -- Bash (platform checks + initialize SQL log):
echo "=== OS ==="; uname -s 2>/dev/null || echo "WINDOWS"; echo "=== Python ==="; python3 --version 2>/dev/null || python --version 2>/dev/null || echo "NOT FOUND"; echo "=== Working Directory ==="; pwd; echo "=== Home Directory ==="; echo $HOME; echo "-- Snowpipe Streaming AI Webinar SQL Log" > streaming_demo_sql.log; echo "-- Generated by Cortex Code Snowpipe Streaming AI Webinar Skill" >> streaming_demo_sql.log; echo "" >> streaming_demo_sql.logTool call 2 -- SQL (Snowflake context):
SELECT
CURRENT_USER() AS current_user,
CURRENT_ROLE() AS current_role,
CURRENT_DATABASE() AS current_database,
CURRENT_SCHEMA() AS current_schema,
CURRENT_WAREHOUSE() AS current_warehouse,
CURRENT_ACCOUNT() AS current_account;Interpret the OS result:
Darwin -> macOSLinux -> Linux (also check glibc >= 2.26: ldd --version 2>&1 | head -1)WINDOWS or MINGW* or MSYS* or CYGWIN* -> Windows (experimental -- warn user, recommend WSL2)Error handling -- stop if:
CURRENT_WAREHOUSE() is NULLStore the working Python command (python3 or python) and all Snowflake context values.
Ask the user:
STREAMING_WEBINAR_DB.STREAMING_SCHEMA)STREAMING_WEBINAR_USERS)Store the dashboard preference as a boolean (DEPLOY_STREAMLIT). If the user declines, Step 5 will skip the dashboard deployment and only start the background streaming.
Do NOT ask about demo duration -- this skill always streams for 30 minutes in the background.
Purpose: The Snowpipe Streaming SDK uses key-pair (JWT) authentication.
IMPORTANT — NEVER display private or public key content to the user. All key material must stay hidden from the conversation output.
Run key generation in one single Bash call with all output suppressed:
openssl genrsa 2048 2>/dev/null | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt 2>/dev/null && chmod 600 rsa_key.p8 && openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub 2>/dev/null && echo "RSA key-pair generated successfully (rsa_key.p8 + rsa_key.pub)"Then silently read rsa_key.pub using the Read tool. Do NOT echo, cat, or print the key content in any Bash command or in your response to the user. Internally strip the -----BEGIN PUBLIC KEY----- and -----END PUBLIC KEY----- header/footer lines and join the remaining lines into a single string. Store that value for use in Step 3's ALTER USER ... SET RSA_PUBLIC_KEY statement.
What to tell the user: "RSA key-pair generated." — nothing more. Do not show any key material.
Purpose: Create all Snowflake objects in one SQL call.
IMPORTANT — Key material in SQL: The ALTER USER ... SET RSA_PUBLIC_KEY statement contains the public key value. When logging this SQL to streaming_demo_sql.log, replace the actual key value with <REDACTED>. When presenting SQL output to the user, do NOT display the full ALTER USER statement — just confirm it succeeded.
CREATE DATABASE IF NOT EXISTS <DATABASE>;
CREATE SCHEMA IF NOT EXISTS <DATABASE>.<SCHEMA>;
USE DATABASE <DATABASE>;
USE SCHEMA <SCHEMA>;
CREATE OR REPLACE TABLE <DATABASE>.<SCHEMA>.<TABLE_NAME> (
user_id INTEGER,
first_name VARCHAR(100),
last_name VARCHAR(100),
email VARCHAR(255),
phone_number VARCHAR(50),
address VARCHAR(500),
date_of_birth DATE,
registration_date TIMESTAMP_NTZ,
city VARCHAR(100),
state VARCHAR(100),
country VARCHAR(100),
order_amount NUMBER(10,2)
);
CREATE ROLE IF NOT EXISTS STREAMING_DEMO_ROLE;
CREATE USER IF NOT EXISTS STREAMING_DEMO_USER DEFAULT_ROLE = STREAMING_DEMO_ROLE;
GRANT ROLE STREAMING_DEMO_ROLE TO USER STREAMING_DEMO_USER;
ALTER USER STREAMING_DEMO_USER SET RSA_PUBLIC_KEY = '<PUBK_VALUE>';
GRANT USAGE ON DATABASE <DATABASE> TO ROLE STREAMING_DEMO_ROLE;
GRANT USAGE ON SCHEMA <DATABASE>.<SCHEMA> TO ROLE STREAMING_DEMO_ROLE;
GRANT USAGE ON WAREHOUSE <WAREHOUSE> TO ROLE STREAMING_DEMO_ROLE;
GRANT OWNERSHIP ON TABLE <DATABASE>.<SCHEMA>.<TABLE_NAME> TO ROLE STREAMING_DEMO_ROLE COPY CURRENT GRANTS;
GRANT SELECT ON TABLE <DATABASE>.<SCHEMA>.<TABLE_NAME> TO ROLE <CURRENT_ROLE>;
DESC USER STREAMING_DEMO_USER;Look for RSA_PUBLIC_KEY in the DESC USER output to confirm it was set.
FileWrite 1 -- profile.json:
{
"user": "STREAMING_DEMO_USER",
"account": "<ACCOUNT_IDENTIFIER>",
"url": "https://<ACCOUNT_IDENTIFIER>.snowflakecomputing.com:443",
"private_key_file": "rsa_key.p8",
"role": "STREAMING_DEMO_ROLE"
}FileWrite 2 -- streaming_demo.py:
Use the demo script from the "Demo script reference" section below. Key difference from quickstart: DEMO_MINUTES = 30 (runs for 30 minutes to keep data flowing throughout the webinar).
<PYTHON_CMD> -m venv streaming_venv && source streaming_venv/bin/activate && pip install --upgrade pip && pip install snowpipe-streaming faker && python -c "from snowflake.ingest.streaming import StreamingIngestClient; print('SDK OK')" && python -c "from faker import Faker; print('Faker OK')"import time
import os
import random
from faker import Faker
os.environ["SS_LOG_LEVEL"] = "warn"
from snowflake.ingest.streaming import StreamingIngestClient
fake = Faker()
# --- Configuration (auto-populated by Cortex Code) ---
BATCH_SIZE = 5
DEMO_MINUTES = 30 # Runs in background for the full webinar
NUM_BATCHES = DEMO_MINUTES * 120 # 120 batches per minute (5 rows every 0.5s)
DATABASE = "<DATABASE>"
SCHEMA = "<SCHEMA>"
# Default auto-created pipe: <TABLE_NAME>-streaming (hyphen, not underscore)
PIPE = "<TABLE_NAME>-streaming"
PROFILE_JSON_PATH = "profile.json"
# --- Initialize Streaming Client ---
print(f"Connecting to Snowflake...")
print(f" Database: {DATABASE}")
print(f" Schema: {SCHEMA}")
print(f" Pipe: {PIPE} (default auto-created pipe)")
try:
client = StreamingIngestClient(
"STREAMING_WEBINAR_CLIENT",
DATABASE,
SCHEMA,
PIPE,
profile_json=PROFILE_JSON_PATH,
properties=None,
)
except Exception as e:
print(f"\n[ERROR] Failed to create StreamingIngestClient:")
print(f" {e}")
raise SystemExit(1)
# --- Open channel ---
print(f"\nOpening channel...")
try:
channel, status = client.open_channel("STREAMING_WEBINAR_CHANNEL")
print(f" Channel: {status.channel_name}")
print(f" Status: {status.status_code}")
except Exception as e:
print(f"\n[ERROR] Failed to open channel:")
print(f" {e}")
client.close()
raise SystemExit(1)
# --- Stream fake user data in batches ---
total_rows = BATCH_SIZE * NUM_BATCHES
print(f"\nStreaming {total_rows} rows ({NUM_BATCHES} batches of {BATCH_SIZE}) over ~{DEMO_MINUTES} minute(s)...")
print(f"Data is flowing in the background. Move on to the AI setup!\n")
errors = []
row_id = 0
for batch in range(1, NUM_BATCHES + 1):
for _ in range(BATCH_SIZE):
row_id += 1
row = {
"user_id": row_id,
"first_name": fake.first_name(),
"last_name": fake.last_name(),
"email": fake.email(),
"phone_number": fake.phone_number(),
"address": fake.address().replace("\n", ", "),
"date_of_birth": fake.date_of_birth(minimum_age=18, maximum_age=80).isoformat(),
"registration_date": fake.date_time_this_year().isoformat(),
"city": fake.city(),
"state": fake.state(),
"country": fake.country(),
"order_amount": round(random.uniform(1, 100), 2),
}
try:
channel.append_row(row, offset_token=str(row_id))
except Exception as e:
errors.append((row_id, str(e)))
if len(errors) >= 5:
print(f"\n[ERROR] Too many row errors ({len(errors)}). Stopping.")
break
if len(errors) >= 5:
break
if batch % 120 == 0: # Progress update every minute
print(f" [producer] batch {batch}/{NUM_BATCHES} (row {row_id})")
time.sleep(0.5)
if errors:
print(f"\n[WARNING] {len(errors)} rows failed to send:")
for offset, err in errors[:5]:
print(f" Row {offset}: {err}")
# --- Wait for all data to be committed ---
print(f"\n[producer] Waiting for commits to reach offset {row_id}...")
try:
channel.wait_for_commit(
lambda token: token is not None and int(token) >= row_id,
timeout_seconds=120
)
print("All rows committed.")
except Exception as e:
print(f"\n[WARNING] Commit wait timed out or failed: {e}")
print("Some rows may still be in flight. Check the table directly.")
# --- Display channel status ---
s = channel.get_channel_status()
print(f"\nChannel status:")
print(f" Channel: {s.channel_name}")
print(f" Committed offset: {s.latest_committed_offset_token}")
print(f" Rows inserted: {s.rows_inserted_count}")
print(f" Rows errored: {s.rows_error_count}")
print(f" Avg server latency: {s.server_avg_processing_latency}")
print(f" Last error message: {s.last_error_message}")
# --- Cleanup ---
print(f"\nFinal committed offset: {channel.get_latest_committed_offset_token()}")
channel.close()
client.close()
print("\nStreaming complete! (Background process ending)")Purpose: Start the background streaming script, and optionally deploy the live monitoring dashboard. If DEPLOY_STREAMLIT is false, skip straight to starting the streaming script only.
Run only the streaming script in the background:
source streaming_venv/bin/activate && python streaming_demo.pyUse run_in_background=true. Tell the user:
"Data is now streaming in the background (~10 rows/second for 30 minutes). Skipping dashboard -- moving on to the AI layer."
Immediately proceed to Step 6.
The template lives at ~/.snowflake/cortex/skills/snowpipe-streaming-ai-webinar/streamlit_app.py.template. Check if it exists using the Read tool. If it exists, proceed to 5b. If it doesn't exist, fall back to writing the full Streamlit app inline (see the Templates section at the end of this skill for the full source).
Execute these three tool calls in parallel:
Tool call 1 -- SQL (create stage):
CREATE STAGE IF NOT EXISTS <DATABASE>.<SCHEMA>.STREAMING_STREAMLIT_STAGE
DIRECTORY = (ENABLE = TRUE);Tool call 2 -- Bash (copy template and substitute placeholders):
sed -e 's/{{DATABASE}}/<DATABASE>/g' -e 's/{{SCHEMA}}/<SCHEMA>/g' -e 's/{{TABLE_NAME}}/<TABLE_NAME>/g' ~/.snowflake/cortex/skills/snowpipe-streaming-ai-webinar/streamlit_app.py.template > <LOCAL_PATH>/streamlit_app.pyReplace <DATABASE>, <SCHEMA>, <TABLE_NAME> in the sed command with the actual values collected in Step 1.
Tool call 3 -- Bash (start streaming in background with run_in_background=true):
source streaming_venv/bin/activate && python streaming_demo.pyIMPORTANT: Use the Bash tool with run_in_background=true for tool call 3 so it does not block. The script will run for 30 minutes.
After 5b completes (stage created + streamlit file prepared), upload the file:
snow stage copy <LOCAL_PATH>/streamlit_app.py @<DATABASE>.<SCHEMA>.STREAMING_STREAMLIT_STAGE --overwriteCREATE OR REPLACE STREAMLIT <DATABASE>.<SCHEMA>.STREAMING_MONITOR
ROOT_LOCATION = '@<DATABASE>.<SCHEMA>.STREAMING_STREAMLIT_STAGE'
MAIN_FILE = 'streamlit_app.py'
QUERY_WAREHOUSE = '<WAREHOUSE>';
SHOW STREAMLITS IN SCHEMA <DATABASE>.<SCHEMA>;"Your real-time streaming dashboard is ready and data is already streaming in the background!
To access the dashboard:
- Open Snowsight in your browser
- Navigate to Projects > Streamlit in the left sidebar
- Find and click STREAMING_MONITOR
You should see rows appearing within 5-10 seconds. While you explore the dashboard, I'll set up the AI layer."
Do NOT wait for the user to confirm the dashboard loaded. Immediately proceed to Step 6.
Purpose: Create a semantic view on the streaming table so Cortex Analyst can answer natural-language questions about the live data. We use direct SQL here (not the semantic-view skill) because we know the exact table schema and want maximum speed for the live demo.
First, verify that some data has landed:
SELECT COUNT(*) AS row_count FROM <DATABASE>.<SCHEMA>.<TABLE_NAME>;If row_count is 0, wait 10 seconds and retry (up to 3 times). If still 0, warn the user and troubleshoot.
Run this single SQL call:
CREATE OR REPLACE SEMANTIC VIEW <DATABASE>.<SCHEMA>.STREAMING_ANALYTICS
TABLES (
users AS <DATABASE>.<SCHEMA>.<TABLE_NAME>
PRIMARY KEY (user_id)
WITH SYNONYMS = ('registrations', 'customers', 'orders')
COMMENT = 'Real-time user registrations and orders streaming in via Snowpipe Streaming HPA'
)
FACTS (
users.order_amount AS order_amount
COMMENT = 'Dollar amount of the order placed at registration',
users.user_id_value AS user_id
COMMENT = 'Unique identifier for each registered user'
)
DIMENSIONS (
users.first_name AS first_name
COMMENT = 'First name of the registered user',
users.last_name AS last_name
COMMENT = 'Last name of the registered user',
users.email AS email
COMMENT = 'Email address of the registered user',
users.phone_number AS phone_number
COMMENT = 'Phone number of the registered user',
users.city AS city
WITH SYNONYMS = ('town')
COMMENT = 'City where the user is located',
users.state AS state
WITH SYNONYMS = ('province', 'region')
COMMENT = 'State or province where the user is located',
users.country AS country
WITH SYNONYMS = ('nation')
COMMENT = 'Country where the user is located',
users.registration_date AS registration_date
WITH SYNONYMS = ('signup date', 'sign up date', 'registered at')
COMMENT = 'Timestamp when the user registered',
users.registration_day AS DATE_TRUNC('day', registration_date)
WITH SYNONYMS = ('signup day')
COMMENT = 'Day the user registered (date only)',
users.registration_hour AS DATE_TRUNC('hour', registration_date)
COMMENT = 'Hour the user registered',
users.date_of_birth AS date_of_birth
WITH SYNONYMS = ('birthday', 'dob', 'birth date')
COMMENT = 'Date of birth of the user',
users.age AS DATEDIFF('year', date_of_birth, CURRENT_DATE())
WITH SYNONYMS = ('years old')
COMMENT = 'Current age of the user in years',
users.age_group AS
CASE
WHEN DATEDIFF('year', date_of_birth, CURRENT_DATE()) < 25 THEN '18-24'
WHEN DATEDIFF('year', date_of_birth, CURRENT_DATE()) < 35 THEN '25-34'
WHEN DATEDIFF('year', date_of_birth, CURRENT_DATE()) < 45 THEN '35-44'
WHEN DATEDIFF('year', date_of_birth, CURRENT_DATE()) < 55 THEN '45-54'
WHEN DATEDIFF('year', date_of_birth, CURRENT_DATE()) < 65 THEN '55-64'
ELSE '65+'
END
WITH SYNONYMS = ('age bracket', 'age range', 'age decade', 'demographic')
COMMENT = 'Age group bucket for the user'
)
METRICS (
users.total_users AS COUNT(users.user_id_value)
WITH SYNONYMS = ('user count', 'number of users', 'registrations', 'how many users')
COMMENT = 'Total number of registered users',
users.total_revenue AS SUM(users.order_amount)
WITH SYNONYMS = ('total sales', 'total orders', 'revenue', 'sales', 'income')
COMMENT = 'Total revenue from all orders',
users.avg_order_value AS AVG(users.order_amount)
WITH SYNONYMS = ('average order', 'average spend', 'avg order size', 'mean order')
COMMENT = 'Average order amount per user',
users.max_order AS MAX(users.order_amount)
WITH SYNONYMS = ('largest order', 'biggest order', 'highest order')
COMMENT = 'Largest single order amount',
users.min_order AS MIN(users.order_amount)
WITH SYNONYMS = ('smallest order', 'lowest order')
COMMENT = 'Smallest single order amount',
users.unique_countries AS COUNT(DISTINCT users.country)
WITH SYNONYMS = ('number of countries', 'country count', 'how many countries')
COMMENT = 'Number of distinct countries represented',
users.unique_cities AS COUNT(DISTINCT users.city)
WITH SYNONYMS = ('number of cities', 'city count')
COMMENT = 'Number of distinct cities represented',
users.unique_states AS COUNT(DISTINCT users.state)
WITH SYNONYMS = ('number of states', 'state count')
COMMENT = 'Number of distinct states represented'
)
COMMENT = 'Semantic view for real-time streaming user registration and order data ingested via Snowpipe Streaming HPA. Data arrives continuously -- counts and totals grow over time.'
AI_SQL_GENERATION 'This data represents real-time user registrations with associated orders streaming via Snowpipe Streaming HPA. Each row is one user with one order. The data is continuously growing as new rows arrive every few seconds. When asked about trends over time, use the registration_date dimension. When asked about demographics, use the age_group dimension. When asked about geography, use country, state, or city dimensions. For revenue questions, use the total_revenue metric (SUM of order_amount). For user count questions, use the total_users metric (COUNT of user_id). For average order questions, use the avg_order_value metric. The order_amount column stores dollar amounts as NUMBER(10,2). Always use metric and dimension names defined in this semantic view rather than raw column names.';SHOW SEMANTIC VIEWS IN SCHEMA <DATABASE>.<SCHEMA>;Tell the user:
"Semantic view
STREAMING_ANALYTICScreated. Cortex Analyst can now answer natural-language questions about the streaming data."
Purpose: Create a Cortex Agent that uses the semantic view to answer natural-language questions. We use direct CREATE AGENT SQL for speed.
Run this single SQL call:
CREATE OR REPLACE AGENT <DATABASE>.<SCHEMA>.STREAMING_AGENT
COMMENT = 'AI agent for exploring real-time streaming user and order data via Snowpipe Streaming HPA'
FROM SPECIFICATION $$
models:
orchestration: auto
instructions:
system: >
You are a streaming data analyst. You help explore real-time user registration
and e-commerce order data that is being ingested via Snowpipe Streaming HPA.
The data includes user demographics (name, email, city, state, country, date of birth),
registration timestamps, and order amounts. New data arrives every few seconds,
so counts and totals are continuously growing.
response: >
Answer questions about the streaming user and order data concisely.
Include relevant numbers and trends. When showing geographic data,
mention both the top entries and the total count. Format currency with
dollar signs and two decimal places. Data is streaming in real-time,
so counts and totals are continuously growing.
tools:
- tool_spec:
type: cortex_analyst_text_to_sql
name: StreamingAnalytics
description: >
Query real-time streaming user registration and order data.
Use this tool for questions about users, revenue, orders,
geographic distribution, demographics, and trends over time.
tool_resources:
StreamingAnalytics:
semantic_view: "<DATABASE>.<SCHEMA>.STREAMING_ANALYTICS"
$$;IMPORTANT: Replace <DATABASE>, <SCHEMA> in the semantic_view value inside the YAML spec with the actual database and schema names (fully qualified).
SHOW AGENTS IN SCHEMA <DATABASE>.<SCHEMA>;Tell the user:
"Cortex Agent
STREAMING_AGENTcreated and pointed at the semantic view. You can interact with it in Snowsight or ask questions here."
Purpose: Prove everything works by running 3-4 live queries against the semantic view. The presenter sees real results from the streaming data right in the terminal -- instant "wow" moment.
SELECT * FROM SEMANTIC_VIEW(
<DATABASE>.<SCHEMA>.STREAMING_ANALYTICS
METRICS users.total_users, users.total_revenue, users.avg_order_value
);Tell the user the results. Note that these numbers are growing in real-time.
SELECT * FROM SEMANTIC_VIEW(
<DATABASE>.<SCHEMA>.STREAMING_ANALYTICS
DIMENSIONS users.country
METRICS users.total_revenue, users.total_users
)
ORDER BY total_revenue DESC
LIMIT 5;SELECT * FROM SEMANTIC_VIEW(
<DATABASE>.<SCHEMA>.STREAMING_ANALYTICS
DIMENSIONS users.age_group
METRICS users.total_users, users.avg_order_value
)
ORDER BY avg_order_value DESC;SELECT * FROM SEMANTIC_VIEW(
<DATABASE>.<SCHEMA>.STREAMING_ANALYTICS
DIMENSIONS users.registration_hour
METRICS users.total_users, users.total_revenue
)
ORDER BY registration_hour;After running the queries, tell the user:
"All 4 queries returned live results from streaming data. The semantic view and agent are working.
Try re-running any of these in a minute -- the numbers will have changed because data is still streaming in.
Ready for the live demo portion?"
Purpose: Everything is set up. Hand control to the presenter for the live demo.
Present this to the user:
Setup Complete! Your Snowpipe Streaming AI Webinar Demo is Ready.
You now have:
- Streaming pipeline -- Data flowing into
<DATABASE>.<SCHEMA>.<TABLE_NAME>(~10 rows/second)- Live dashboard -- Streamlit app
STREAMING_MONITORshowing real-time metrics- Semantic view --
STREAMING_ANALYTICSfor natural-language querying- Cortex Agent --
STREAMING_AGENTready for conversational analyticsDemo flow for your presentation:
Show the Streamlit dashboard -- Point out rows arriving in real-time, cumulative revenue growing, countries populating
Open Cortex Agent in Snowsight:
- Navigate to AI & ML > Cortex Agents (or Snowflake Intelligence > Agents)
- Find and open STREAMING_AGENT
- Try questions like "How many users have registered?" or "What is revenue by country?"
Key demo moments:
- Ask "How many users have registered so far?" -- note the number
- Show a geographic breakdown: "What's the total revenue by country?"
- Wait 30-60 seconds, re-ask "How many users now?" -- the number has grown! (live data)
- Try an ad-lib question: "What's the average order value for users in California?"
Toggle between dashboard and agent to show the full picture: real-time monitoring + AI-powered analysis on the same live data
The streaming script runs for ~30 minutes. You have plenty of time for the demo.
When you're done, say "clean up" and I'll tear everything down.
Purpose: Remove all demo assets from Snowflake.
First, check if the streaming script is still running. If it is, kill the background process.
"The demo is complete! How would you like to handle cleanup?
- Clean up Snowflake objects, keep local files (default) -- Drops agent, semantic view, demo user, role, Streamlit app, stage, database/schema. Keeps local files for review.
- Clean up everything -- Drops Snowflake objects AND deletes local files.
- Keep everything -- Leave all objects and files in place."
If option 1 (default) or option 2, run the Snowflake cleanup SQL:
-- AI layer
DROP AGENT IF EXISTS <DATABASE>.<SCHEMA>.STREAMING_AGENT;
DROP SEMANTIC VIEW IF EXISTS <DATABASE>.<SCHEMA>.STREAMING_ANALYTICS;
-- Streamlit app and stage
DROP STREAMLIT IF EXISTS <DATABASE>.<SCHEMA>.STREAMING_MONITOR;
DROP STAGE IF EXISTS <DATABASE>.<SCHEMA>.STREAMING_STREAMLIT_STAGE;
-- Schema cascade drops table and auto-created pipe
DROP SCHEMA IF EXISTS <DATABASE>.<SCHEMA>;
-- Demo user and role
DROP USER IF EXISTS STREAMING_DEMO_USER;
DROP ROLE IF EXISTS STREAMING_DEMO_ROLE;
-- Database (only if created for this demo)
DROP DATABASE IF EXISTS <DATABASE>;If option 2, also run local cleanup:
deactivate 2>/dev/null; rm -rf streaming_venv; rm -f rsa_key.p8 rsa_key.pub profile.json streaming_demo.py streamlit_app.py streaming_demo_sql.logAfter cleanup, summarize what was removed.
rsa_key.p8 file stays local and is cleaned up after.private_key_file in profile.json (path to .p8 file), not inline key content.<TABLE_NAME>-streaming.User: "snowpipe streaming ai webinar" -> Run full flow Steps 0-9. Set up streaming, dashboard, semantic view, agent, and run showcase queries.
User: "clean up" (after demo) -> Run Step 10 cleanup.
User: "I just want the quickstart without the AI stuff"
-> Redirect to the snowpipe-streaming-quickstart skill instead.
CREATE OR REPLACE TABLE {{DATABASE}}.{{SCHEMA}}.{{TABLE_NAME}} (
user_id INTEGER,
first_name VARCHAR(100),
last_name VARCHAR(100),
email VARCHAR(255),
phone_number VARCHAR(50),
address VARCHAR(500),
date_of_birth DATE,
registration_date TIMESTAMP_NTZ,
city VARCHAR(100),
state VARCHAR(100),
country VARCHAR(100),
order_amount NUMBER(10,2)
);No CREATE PIPE needed. The SDK references:
{{TABLE_NAME}}-streaming25dfdc4
If you maintain this skill, you can claim it as your own. Once claimed, you can manage eval scenarios, bundle related skills, attach documentation or rules, and ensure cross-agent compatibility.