Implementing Real-Time Event Stream Processing with KQL in Fabric

If you've been following along with our series, you've already mastered KQL basics: creating them, querying data, and building visualizations. That's fantastic! You now have a solid foundation.
Missed the previous article? I'd highly recommend reading it first: "Mastering KQL Databases in Microsoft Fabric" It covers everything from database creation to writing queries and creating charts. You'll want that knowledge before diving into today's topics.
In today's blog, we’re moving from static data to real-time streaming, where data flows continuously into your systems like water through pipes.
By the end of this article, you'll know how to:
Get Data From Multiple Sources
Create Event Streams
Route Streaming Data to KQL Databases
Process Events in Real-Time
Connect KQL to Power BI
Stream into Lakehouses
Work with Real-Time Data in Notebooks
Manage Data Retention
We're building a complete real-time analytics solution from scratch. Let's get started!
Understanding Event Streams
Before we dive into the technical steps, let me explain what event streams are in simple terms.
Imagine you own a chain of retail stores. Every second, thousands of things are happening:
Customers are making purchases
Products are being scanned at checkout
Inventory is being updated
Security cameras are detecting movement
Traditional systems wait, collect all this data, and process it later, maybe once a day. That's like reading yesterday's newspaper to understand today's news.
Event streams work differently. They capture these activities the instant they happen and let you analyze them immediately.
It's like watching live news instead of reading yesterday's paper. This is what we call "real-time analytics."
Microsoft Fabric's event streams feature is a visual, no-code way to:
Capture data from various sources (like IoT devices, applications, databases)
Transform the data (filter, clean, reshape)
Route it to different destinations (databases, dashboards, storage)
All of this happens without writing a single line of code! It's like building with LEGO blocks, you connect pieces together visually.
Creating Your Event Stream
Let's create an event stream that captures data from a sample source and routes it to a KQL database.
From your workspace:
Click "New item" in the toolbar
Scroll down or search for "Eventstream"
Click on it
Give it a meaningful name like "SalesDataStream" or "IoTSensorStream"
Click "Create"

Understanding the Canvas
The canvas you're looking at works left to right:
Left side: Where data comes from (sources)
Middle: Where you transform data (operations)
Right side: Where data goes (destinations)
Think of it as a pipeline: Source → Transform → Destination
Adding a Data Source
Let's add a sample data source to get started.
Using Sample Data
On your canvas, you'll see "Add source" option
Click it and select "Sample data"
Choose "Bicycles" (this provides sample bike rental data)
Give your source a name like "BikeRentals"
Click "Add"

Other options to get your data
Connecting to Azure Event Hubs
If you have real streaming data from Azure Event Hubs:
Click "Add source" on your event stream canvas
Select "Azure Event Hubs"
You'll need:
Connection string: Get this from your Azure Event Hub settings
Consumer group: Usually "$Default" unless you created a custom one
Event Hub name: The specific hub you're connecting to
Click "Add" and test the connection
The system will validate your credentials and start streaming data.
Custom Applications
Got your own application that generates data? You can stream directly from it:
Click "Add source"
Select "Custom App"
You'll receive an endpoint URL and a key
Use these in your application to send data
We'll explore this in more detail later when we cover notebooks.
Viewing Your Data
Once your source is connected:
Click on the source node in your canvas
Look at the bottom panel
Select the "Data preview" tab
You'll see your data flowing in real-time!
Routing Streaming Data to KQL Database
Now that data is flowing, let's store it in a KQL database where we can query and analyze it.
Creating the Connection
Step 1: Add a Destination
From your event stream canvas:
Hover over "Add Destinations" in the menu
Select the drop down icon
Choose "Eventhouse"

A new node appears on your canvas labeled "Eventhouse."
Step 2: Configure the KQL Database Destination
Click on the Eventhouse node. A configuration panel opens on the right:
Destination name: Give it a clear name like "BikeRentalsDB"
Workspace: Select your workspace from the dropdown
Eventhouse: Choose your existing eventhouse (or create a new one)
KQL Database: Select the database you created in our previous session
Table: Either select an existing table or create a new one
For this example, let's create a new table called "RentalEvent"

Step 3: Configure Data Format
You need to tell the system what format your data is in:
Input data format: Select the format (usually JSON for most streaming sources)
Mapping: The system will automatically detect your data structure
Review the schema preview. You'll see columns like:
BikeID
StationID
RentalDuration
Timestamp
Step 4: Activate Ingestion
At the bottom of the configuration panel, you'll see a checkbox:
- ✓ Activate ingestion after adding the data
Make sure this is checked. Then click "Add."
Step 5: Publish Your Changes
Your changes aren't active until you publish:
Look at the top of your screen
Find the "Publish" button
Click it
Your event stream is now live! Data is flowing from your source into your KQL database in real-time.

Verifying Data is Arriving
Let's check that everything is working:
Navigate to your KQL database (from your workspace)
Find your "RentalEvents" table
Click on it
In the query window, type:
RentalEvents
| take 10
- Click "Run"
If you see data, congratulations! Your real-time pipeline is working. Refresh every few seconds and you'll see new data appearing.
Event Processing and Transformation
Raw data isn't always in the perfect format. Event processing lets you clean, filter, and transform data before it reaches your database.
Why Transform Data?
Imagine your bike rental data includes test records or data with errors:
Test rentals with BikeID = "TEST001"
Rentals with negative durations (errors)
Personal information you don't need
Processing helps you:
Filter out bad data
Remove unnecessary fields
Calculate new values
Aggregate information
Adding Transformations
Let's add some processing to our event stream.
Step 1: Add a Filter Operation
Between your source and destination:
Hover over your source node's right edge
Click the green plus icon
Select "Operations" → "Filter"
A filter node appears
Step 2: Configure the Filter
Click on the filter node. In the configuration panel:
Field: Select "BikeID"
Condition: Select "doesn't contain"
Value: Type "TEST"
This removes all test data from your stream.
Step 3: Add More Filters
Let's add another filter for data quality:
Add another filter operation
Field: "RentalDuration"
Condition: "is greater than"
Value: "0"
Now you're only keeping valid rentals with positive durations.
Step 4: Managing Fields
Maybe you don't need all columns. Let's select only what matters:
Add a "Manage Fields" operation
Choose "Keep fields"
Select only:
Timestamp
BikeID
StationID
RentalDuration
This reduces your data size and processing time.
Step 5: Creating Calculated Fields
Want to add new information? Use the "Expand" operation:
Add an "Expand" operation
Click "Add field"
Field name: "RentalCategory"
Function: Use a condition:
If RentalDuration > 60, then "Long"
Else "Short"
Now each event is categorized automatically!
Aggregate Operations
Sometimes you don't want every single event—you want summaries.
Creating Aggregations:
Add an "Aggregate" operation
Choose your aggregation window: "Tumbling window"
Set duration: "5 minutes"
Group by: "StationID"
Calculate:
Count of rentals
Average duration
Sum of durations
This creates 5-minute summaries for each station instead of individual events.
Publishing Your Transformations
After adding all your operations:
Verify the flow looks correct on your canvas
Test each node by checking its data preview
Click "Publish" at the top
Your transformations are now active! Clean, processed data flows into your database.
Connecting KQL Database to Power BI
Now let's visualize your real-time data in Power BI dashboards.
Understanding Connection Modes
Power BI offers two ways to connect to KQL databases:
Import Mode: Copies data into Power BI
Pros: Fast dashboard performance
Cons: Data refreshes periodically, not truly real-time
Best for: Small datasets, historical analysis
DirectQuery Mode: Queries data directly from KQL database
Pros: Always shows current data, near real-time
Cons: Dashboard depends on database performance
Best for: Large datasets, real-time monitoring
For our streaming scenario, we'll use DirectQuery.
Connecting from Power BI Desktop
Step 1: Open Power BI Desktop
If you don't have it, download it free from Microsoft's store or website.
Step 2: Get Data from KQL Database
On the Home ribbon, click "Get Data"
In the dropdown, select "More..."
Search for "KQL Database"
Select "KQL Database" from the results
Click "Connect"
Alternatively:
Click "OneLake data hub" on the Home ribbon
Select "KQL Databases"
You'll see all KQL databases you have access to
Step 3: Select Your Database
Browse the list of available databases
Find your "RentalEvents" database
Select it
Click "Connect"
Step 4: Authenticate
A window asks for credentials:
Select "Microsoft account"
Click "Sign in"
Use your Fabric credentials
Click "Connect"
Step 5: Select Tables
The Navigator window shows your tables:
Check the boxes next to tables you want:
- ✓ RentalEvents
You can click "Transform Data" to modify in Power Query
Or click "Load" to import directly
Step 6: Choose Connection Mode
A "Connection settings" dialog appears:
Select "DirectQuery" (for real-time data)
Click "OK"
Your data is now connected!
Creating Real-Time Visuals
Let's build a dashboard that updates automatically.
Visual 1: Rentals Over Time
From the Visualizations pane, select "Line chart"
Axis: Drag "Timestamp" here
Values: Drag a count of "BikeID" here
Format the time axis to show last 24 hours
This shows rental trends throughout the day.
Visual 2: Rentals by Station
Select "Map" visualization
Location: "StationID"
Size: Count of rentals
Larger bubbles show busier stations
Perfect for seeing geographic patterns!
Visual 3: Average Duration
Select "Card" visualization
Add "RentalDuration"
Change aggregation to "Average"
Shows current average rental time.
Visual 4: Real-Time Table
Select "Table" visualization
Add all relevant fields
In the query, add this to show only recent data:
RentalEvents
| where Timestamp > ago(5m)
| order by Timestamp desc
Setting Up Auto-Refresh
To make your dashboard truly real-time:
Go to "File" → "Options and settings" → "Options"
Select "CURRENT FILE" → "Data load"
Under "DirectQuery":
Set "Automatic page refresh" to "On"
Set interval to "30 seconds" or "1 minute"
Click "OK"
Now your dashboard updates automatically!
Publishing to Fabric
Share your dashboard with your team:
Click "Publish" on the Home ribbon
Select your Fabric workspace
Click "Select"
Streaming Data into Lakehouse
Sometimes you want to store streaming data in a lakehouse for long-term analytics, machine learning, or data science.
Why Stream to Lakehouse?
KQL databases are optimized for real-time queries but can be expensive for long-term storage. Lakehouses are perfect for:
Historical data (keeping years of records)
Data science and machine learning
Cost-effective storage
Integration with Spark notebooks
Adding Lakehouse as Destination
Return to your event stream canvas.
Step 1: Add Lakehouse Destination
From your event stream, click "New destination"
Select "Lakehouse"
A lakehouse node appears on your canvas
Step 2: Configure the Lakehouse
Click the lakehouse node:
Destination name: "BikeRentalsLakehouse"
Workspace: Your workspace
Lakehouse: Select existing or create new
Delta table: Create new table "rental_history"
Input data format: JSON (or your data format)
Step 3: Configure Ingestion Mode
This is important for performance:
Minimum rows per file:
Lower number (like 100) = More files, slower queries
Higher number (like 10,000) = Fewer files, better performance
Recommended: 1,000 to 10,000 for most scenarios
Maximum duration:
How long to wait before writing a file
Minimum: 1 minute
Maximum: 2 hours
Recommended: 5 to 15 minutes for real-time scenarios
For our example:
Minimum rows: 5,000
Maximum duration: 10 minutes
Step 4: Add and Publish
Click "Add"
Connect the lakehouse to your stream or transformation nodes
Click "Publish"
Data now flows to both your KQL database (for real-time queries) and your lakehouse (for historical storage)!
Verifying Lakehouse Data
Check your lakehouse:
Navigate to your lakehouse in your workspace
Go to the "Tables" section
Find "rental_history"
Click on it
You'll see your streaming data being written in Delta format—optimized for analytics!
Using Lakehouse Data
The data in your lakehouse can be used by:
Notebooks: For Python/Spark analysis
Power BI: For historical reports
Data pipelines: For ETL processes
Machine learning: For model training
Real-Time Analytics in Notebooks
Notebooks give you code-based control over streaming data, perfect for complex transformations and machine learning.
Understanding Spark Structured Streaming
Spark notebooks in Fabric support "structured streaming", a way to process continuous data using the same code you'd use for batch data.
Creating a Streaming Notebook
Step 1: Create a Notebook
In your workspace, click "New item"
Select "Notebook"
Name it "RealTimeAnalysis"
Step 2: Attach to Lakehouse
On the left side, click "Add lakehouse"
Select "Existing lakehouse"
Choose your lakehouse from Part 6
Click "Add"
Reading Streaming Data from Event Hub
Let's stream data directly from Azure Event Hub into our notebook.
Streaming Setup (Python):
# Configuration for Event Hub connection
eventHubConnectionString = "YOUR_EVENT_HUB_CONNECTION_STRING"
eventHubName = "YOUR_EVENT_HUB_NAME"
# Configure Event Hub connection
ehConf = {
'eventhubs.connectionString': eventHubConnectionString,
'eventhubs.eventHubName': eventHubName,
'eventhubs.consumerGroup': '$Default'
}
# Read streaming data
streamingDF = (spark
.readStream
.format("eventhubs")
.options(**ehConf)
.load()
)
# Convert Event Hub data to readable format
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StringType, IntegerType, TimestampType
# Define your data schema
schema = StructType() \
.add("BikeID", StringType()) \
.add("StationID", StringType()) \
.add("RentalDuration", IntegerType()) \
.add("Timestamp", TimestampType())
# Parse the JSON data
parsedDF = streamingDF \
.select(from_json(col("body").cast("string"), schema).alias("data")) \
.select("data.*")
# Display streaming data
display(parsedDF)
This creates a continuously updating display of your streaming data!
Processing Streaming Data
Let's add some transformations:
from pyspark.sql.functions import avg, count, window
# Calculate metrics per 5-minute window
aggregatedDF = parsedDF \
.withWatermark("Timestamp", "10 minutes") \
.groupBy(
window("Timestamp", "5 minutes"),
"StationID"
) \
.agg(
count("BikeID").alias("rental_count"),
avg("RentalDuration").alias("avg_duration")
)
# Display results
display(aggregatedDF)
This calculates 5-minute rolling statistics for each station!
Writing Streaming Data to Lakehouse
Store your processed stream:
# Write to Delta table in lakehouse
query = aggregatedDF \
.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/lakehouse/default/checkpoints/rentals") \
.toTable("rental_metrics")
# The stream runs continuously
# To stop it later, use: query.stop()
Real-Time Machine Learning
Apply ML models to streaming data:
from pyspark.ml.classification import LogisticRegressionModel
# Load your pre-trained model
model = LogisticRegressionModel.load("/lakehouse/default/models/rental_prediction")
# Apply to streaming data
predictionsDF = model.transform(parsedDF)
# Write predictions to database
query = predictionsDF \
.writeStream \
.format("delta") \
.outputMode("append") \
.toTable("rental_predictions")
This scores every incoming rental event in real-time!
Monitoring Your Streams
Check stream health:
# Get stream status
print("Stream is active:", query.isActive)
print("Recent progress:")
print(query.recentProgress)
# Get statistics
query.status
Example: E-Commerce Analytics
Let me walk you through a complete example:
Scenario: Online store wants real-time sales monitoring
Sources:
Order events from website
Inventory updates from warehouse
Customer behavior from mobile app
Event Stream Setup:
Three sources feeding one event stream
Filters remove test orders and bots
Join operations enrich orders with product details
Aggregations create 5-minute sales summaries
Destinations:
KQL Database "SalesRealTime":
Stores last 90 days for real-time queries
Powers live dashboards
30-day caching for performance
Lakehouse "SalesHistory":
Stores all historical data
Used for annual reports and ML
Archived after 7 years
Power BI Dashboard shows:
Revenue per minute (line chart)
Top products (bar chart)
Geographic sales (map)
Inventory alerts (cards)
Customer segments (pie chart)
Notebooks provide:
Demand forecasting
Fraud detection
Customer lifetime value prediction
Automated anomaly detection
Result: Business sees sales trends instantly, predicts demand accurately, and responds to problems in minutes instead of hours!
Event Stream Not Receiving Data
Check:
Is the source connection valid? (Look for green checkmark)
Are credentials correct?
Is the source actually sending data?
Check the "Data preview" tab on source node
Solution:
Refresh the connection
Verify source is active
Check Azure Event Hub metrics
Data Not Appearing in KQL Database
Check:
Is the event stream published?
Is ingestion activated?
Are there transformation errors?
Check table permissions
Solution:
// Check ingestion failures
.show ingestion failures
Check:
Is DirectQuery enabled?
Is auto-refresh configured?
Are there query timeout errors?
Solution:
Simplify complex queries
Adjust refresh interval
Check database performance
Notebook Stream Stops
Check:
Is checkpoint location accessible?
Are there schema mismatches?
Is cluster still running?
Solution:
# Restart with new checkpoint
query = df.writeStream \
.option("checkpointLocation", "/new/checkpoint/path") \
.start()
Enoyed this article? Share with your dev/data friends
Got questions or want to share what you've built?
Drop a comment below. I read every single one and love seeing what you create with these tools!

