Skip to main content

Command Palette

Search for a command to run...

Implementing Real-Time Event Stream Processing with KQL in Fabric

Published
14 min read
Implementing Real-Time Event Stream Processing with KQL in Fabric

If you've been following along with our series, you've already mastered KQL basics: creating them, querying data, and building visualizations. That's fantastic! You now have a solid foundation.

Missed the previous article? I'd highly recommend reading it first: "Mastering KQL Databases in Microsoft Fabric" It covers everything from database creation to writing queries and creating charts. You'll want that knowledge before diving into today's topics.

In today's blog, we’re moving from static data to real-time streaming, where data flows continuously into your systems like water through pipes.

By the end of this article, you'll know how to:

  1. Get Data From Multiple Sources

  2. Create Event Streams

  3. Route Streaming Data to KQL Databases

  4. Process Events in Real-Time

  5. Connect KQL to Power BI

  6. Stream into Lakehouses

  7. Work with Real-Time Data in Notebooks

  8. Manage Data Retention

We're building a complete real-time analytics solution from scratch. Let's get started!

Understanding Event Streams

Before we dive into the technical steps, let me explain what event streams are in simple terms.

Imagine you own a chain of retail stores. Every second, thousands of things are happening:

  • Customers are making purchases

  • Products are being scanned at checkout

  • Inventory is being updated

  • Security cameras are detecting movement

Traditional systems wait, collect all this data, and process it later, maybe once a day. That's like reading yesterday's newspaper to understand today's news.

Event streams work differently. They capture these activities the instant they happen and let you analyze them immediately.

It's like watching live news instead of reading yesterday's paper. This is what we call "real-time analytics."

Microsoft Fabric's event streams feature is a visual, no-code way to:

  • Capture data from various sources (like IoT devices, applications, databases)

  • Transform the data (filter, clean, reshape)

  • Route it to different destinations (databases, dashboards, storage)

All of this happens without writing a single line of code! It's like building with LEGO blocks, you connect pieces together visually.

Creating Your Event Stream

Let's create an event stream that captures data from a sample source and routes it to a KQL database.

From your workspace:

  • Click "New item" in the toolbar

  • Scroll down or search for "Eventstream"

  • Click on it

  • Give it a meaningful name like "SalesDataStream" or "IoTSensorStream"

  • Click "Create"

Understanding the Canvas

The canvas you're looking at works left to right:

  • Left side: Where data comes from (sources)

  • Middle: Where you transform data (operations)

  • Right side: Where data goes (destinations)

Think of it as a pipeline: Source → Transform → Destination

Adding a Data Source

Let's add a sample data source to get started.

Using Sample Data

  1. On your canvas, you'll see "Add source" option

  2. Click it and select "Sample data"

  3. Choose "Bicycles" (this provides sample bike rental data)

  4. Give your source a name like "BikeRentals"

  5. Click "Add"

Other options to get your data

Connecting to Azure Event Hubs

If you have real streaming data from Azure Event Hubs:

  1. Click "Add source" on your event stream canvas

  2. Select "Azure Event Hubs"

  3. You'll need:

    • Connection string: Get this from your Azure Event Hub settings

    • Consumer group: Usually "$Default" unless you created a custom one

    • Event Hub name: The specific hub you're connecting to

  4. Click "Add" and test the connection

The system will validate your credentials and start streaming data.

Custom Applications

Got your own application that generates data? You can stream directly from it:

  1. Click "Add source"

  2. Select "Custom App"

  3. You'll receive an endpoint URL and a key

  4. Use these in your application to send data

We'll explore this in more detail later when we cover notebooks.

Viewing Your Data

Once your source is connected:

  1. Click on the source node in your canvas

  2. Look at the bottom panel

  3. Select the "Data preview" tab

You'll see your data flowing in real-time!

Routing Streaming Data to KQL Database

Now that data is flowing, let's store it in a KQL database where we can query and analyze it.

Creating the Connection

Step 1: Add a Destination

From your event stream canvas:

  • Hover over "Add Destinations" in the menu

  • Select the drop down icon

  • Choose "Eventhouse"

A new node appears on your canvas labeled "Eventhouse."

Step 2: Configure the KQL Database Destination

Click on the Eventhouse node. A configuration panel opens on the right:

  • Destination name: Give it a clear name like "BikeRentalsDB"

  • Workspace: Select your workspace from the dropdown

  • Eventhouse: Choose your existing eventhouse (or create a new one)

  • KQL Database: Select the database you created in our previous session

  • Table: Either select an existing table or create a new one

For this example, let's create a new table called "RentalEvent"

Step 3: Configure Data Format

You need to tell the system what format your data is in:

  • Input data format: Select the format (usually JSON for most streaming sources)

  • Mapping: The system will automatically detect your data structure

Review the schema preview. You'll see columns like:

  • BikeID

  • StationID

  • RentalDuration

  • Timestamp

Step 4: Activate Ingestion

At the bottom of the configuration panel, you'll see a checkbox:

  • Activate ingestion after adding the data

Make sure this is checked. Then click "Add."

Step 5: Publish Your Changes

Your changes aren't active until you publish:

  1. Look at the top of your screen

  2. Find the "Publish" button

  3. Click it

Your event stream is now live! Data is flowing from your source into your KQL database in real-time.

Verifying Data is Arriving

Let's check that everything is working:

  1. Navigate to your KQL database (from your workspace)

  2. Find your "RentalEvents" table

  3. Click on it

  4. In the query window, type:

RentalEvents
| take 10
  1. Click "Run"

If you see data, congratulations! Your real-time pipeline is working. Refresh every few seconds and you'll see new data appearing.

Event Processing and Transformation

Raw data isn't always in the perfect format. Event processing lets you clean, filter, and transform data before it reaches your database.

Why Transform Data?

Imagine your bike rental data includes test records or data with errors:

  • Test rentals with BikeID = "TEST001"

  • Rentals with negative durations (errors)

  • Personal information you don't need

Processing helps you:

  • Filter out bad data

  • Remove unnecessary fields

  • Calculate new values

  • Aggregate information

Adding Transformations

Let's add some processing to our event stream.

Step 1: Add a Filter Operation

Between your source and destination:

  1. Hover over your source node's right edge

  2. Click the green plus icon

  3. Select "Operations" → "Filter"

  4. A filter node appears

Step 2: Configure the Filter

Click on the filter node. In the configuration panel:

  1. Field: Select "BikeID"

  2. Condition: Select "doesn't contain"

  3. Value: Type "TEST"

This removes all test data from your stream.

Step 3: Add More Filters

Let's add another filter for data quality:

  1. Add another filter operation

  2. Field: "RentalDuration"

  3. Condition: "is greater than"

  4. Value: "0"

Now you're only keeping valid rentals with positive durations.

Step 4: Managing Fields

Maybe you don't need all columns. Let's select only what matters:

  1. Add a "Manage Fields" operation

  2. Choose "Keep fields"

  3. Select only:

    • Timestamp

    • BikeID

    • StationID

    • RentalDuration

This reduces your data size and processing time.

Step 5: Creating Calculated Fields

Want to add new information? Use the "Expand" operation:

  1. Add an "Expand" operation

  2. Click "Add field"

  3. Field name: "RentalCategory"

  4. Function: Use a condition:

    • If RentalDuration > 60, then "Long"

    • Else "Short"

Now each event is categorized automatically!

Aggregate Operations

Sometimes you don't want every single event—you want summaries.

Creating Aggregations:

  1. Add an "Aggregate" operation

  2. Choose your aggregation window: "Tumbling window"

  3. Set duration: "5 minutes"

  4. Group by: "StationID"

  5. Calculate:

    • Count of rentals

    • Average duration

    • Sum of durations

This creates 5-minute summaries for each station instead of individual events.

Publishing Your Transformations

After adding all your operations:

  1. Verify the flow looks correct on your canvas

  2. Test each node by checking its data preview

  3. Click "Publish" at the top

Your transformations are now active! Clean, processed data flows into your database.

Connecting KQL Database to Power BI

Now let's visualize your real-time data in Power BI dashboards.

Understanding Connection Modes

Power BI offers two ways to connect to KQL databases:

Import Mode: Copies data into Power BI

  • Pros: Fast dashboard performance

  • Cons: Data refreshes periodically, not truly real-time

  • Best for: Small datasets, historical analysis

DirectQuery Mode: Queries data directly from KQL database

  • Pros: Always shows current data, near real-time

  • Cons: Dashboard depends on database performance

  • Best for: Large datasets, real-time monitoring

For our streaming scenario, we'll use DirectQuery.

Connecting from Power BI Desktop

Step 1: Open Power BI Desktop

If you don't have it, download it free from Microsoft's store or website.

Step 2: Get Data from KQL Database

  1. On the Home ribbon, click "Get Data"

  2. In the dropdown, select "More..."

  3. Search for "KQL Database"

  4. Select "KQL Database" from the results

  5. Click "Connect"

Alternatively:

  1. Click "OneLake data hub" on the Home ribbon

  2. Select "KQL Databases"

  3. You'll see all KQL databases you have access to

Step 3: Select Your Database

  1. Browse the list of available databases

  2. Find your "RentalEvents" database

  3. Select it

  4. Click "Connect"

Step 4: Authenticate

A window asks for credentials:

  1. Select "Microsoft account"

  2. Click "Sign in"

  3. Use your Fabric credentials

  4. Click "Connect"

Step 5: Select Tables

The Navigator window shows your tables:

  1. Check the boxes next to tables you want:

    • ✓ RentalEvents
  2. You can click "Transform Data" to modify in Power Query

  3. Or click "Load" to import directly

Step 6: Choose Connection Mode

A "Connection settings" dialog appears:

  1. Select "DirectQuery" (for real-time data)

  2. Click "OK"

Your data is now connected!

Creating Real-Time Visuals

Let's build a dashboard that updates automatically.

Visual 1: Rentals Over Time

  1. From the Visualizations pane, select "Line chart"

  2. Axis: Drag "Timestamp" here

  3. Values: Drag a count of "BikeID" here

  4. Format the time axis to show last 24 hours

This shows rental trends throughout the day.

Visual 2: Rentals by Station

  1. Select "Map" visualization

  2. Location: "StationID"

  3. Size: Count of rentals

  4. Larger bubbles show busier stations

Perfect for seeing geographic patterns!

Visual 3: Average Duration

  1. Select "Card" visualization

  2. Add "RentalDuration"

  3. Change aggregation to "Average"

Shows current average rental time.

Visual 4: Real-Time Table

  1. Select "Table" visualization

  2. Add all relevant fields

  3. In the query, add this to show only recent data:

RentalEvents
| where Timestamp > ago(5m)
| order by Timestamp desc

Setting Up Auto-Refresh

To make your dashboard truly real-time:

  1. Go to "File" → "Options and settings" → "Options"

  2. Select "CURRENT FILE" → "Data load"

  3. Under "DirectQuery":

    • Set "Automatic page refresh" to "On"

    • Set interval to "30 seconds" or "1 minute"

  4. Click "OK"

Now your dashboard updates automatically!

Publishing to Fabric

Share your dashboard with your team:

  1. Click "Publish" on the Home ribbon

  2. Select your Fabric workspace

  3. Click "Select"

Streaming Data into Lakehouse

Sometimes you want to store streaming data in a lakehouse for long-term analytics, machine learning, or data science.

Why Stream to Lakehouse?

KQL databases are optimized for real-time queries but can be expensive for long-term storage. Lakehouses are perfect for:

  • Historical data (keeping years of records)

  • Data science and machine learning

  • Cost-effective storage

  • Integration with Spark notebooks

Adding Lakehouse as Destination

Return to your event stream canvas.

Step 1: Add Lakehouse Destination

  1. From your event stream, click "New destination"

  2. Select "Lakehouse"

  3. A lakehouse node appears on your canvas

Step 2: Configure the Lakehouse

Click the lakehouse node:

  • Destination name: "BikeRentalsLakehouse"

  • Workspace: Your workspace

  • Lakehouse: Select existing or create new

  • Delta table: Create new table "rental_history"

  • Input data format: JSON (or your data format)

Step 3: Configure Ingestion Mode

This is important for performance:

Minimum rows per file:

  • Lower number (like 100) = More files, slower queries

  • Higher number (like 10,000) = Fewer files, better performance

  • Recommended: 1,000 to 10,000 for most scenarios

Maximum duration:

  • How long to wait before writing a file

  • Minimum: 1 minute

  • Maximum: 2 hours

  • Recommended: 5 to 15 minutes for real-time scenarios

For our example:

  • Minimum rows: 5,000

  • Maximum duration: 10 minutes

Step 4: Add and Publish

  1. Click "Add"

  2. Connect the lakehouse to your stream or transformation nodes

  3. Click "Publish"

Data now flows to both your KQL database (for real-time queries) and your lakehouse (for historical storage)!

Verifying Lakehouse Data

Check your lakehouse:

  1. Navigate to your lakehouse in your workspace

  2. Go to the "Tables" section

  3. Find "rental_history"

  4. Click on it

You'll see your streaming data being written in Delta format—optimized for analytics!

Using Lakehouse Data

The data in your lakehouse can be used by:

  • Notebooks: For Python/Spark analysis

  • Power BI: For historical reports

  • Data pipelines: For ETL processes

  • Machine learning: For model training

Real-Time Analytics in Notebooks

Notebooks give you code-based control over streaming data, perfect for complex transformations and machine learning.

Understanding Spark Structured Streaming

Spark notebooks in Fabric support "structured streaming", a way to process continuous data using the same code you'd use for batch data.

Creating a Streaming Notebook

Step 1: Create a Notebook

  1. In your workspace, click "New item"

  2. Select "Notebook"

  3. Name it "RealTimeAnalysis"

Step 2: Attach to Lakehouse

  1. On the left side, click "Add lakehouse"

  2. Select "Existing lakehouse"

  3. Choose your lakehouse from Part 6

  4. Click "Add"

Reading Streaming Data from Event Hub

Let's stream data directly from Azure Event Hub into our notebook.

Streaming Setup (Python):

# Configuration for Event Hub connection
eventHubConnectionString = "YOUR_EVENT_HUB_CONNECTION_STRING"
eventHubName = "YOUR_EVENT_HUB_NAME"

# Configure Event Hub connection
ehConf = {
    'eventhubs.connectionString': eventHubConnectionString,
    'eventhubs.eventHubName': eventHubName,
    'eventhubs.consumerGroup': '$Default'
}

# Read streaming data
streamingDF = (spark
    .readStream
    .format("eventhubs")
    .options(**ehConf)
    .load()
)

# Convert Event Hub data to readable format
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StringType, IntegerType, TimestampType

# Define your data schema
schema = StructType() \
    .add("BikeID", StringType()) \
    .add("StationID", StringType()) \
    .add("RentalDuration", IntegerType()) \
    .add("Timestamp", TimestampType())

# Parse the JSON data
parsedDF = streamingDF \
    .select(from_json(col("body").cast("string"), schema).alias("data")) \
    .select("data.*")

# Display streaming data
display(parsedDF)

This creates a continuously updating display of your streaming data!

Processing Streaming Data

Let's add some transformations:

from pyspark.sql.functions import avg, count, window

# Calculate metrics per 5-minute window
aggregatedDF = parsedDF \
    .withWatermark("Timestamp", "10 minutes") \
    .groupBy(
        window("Timestamp", "5 minutes"),
        "StationID"
    ) \
    .agg(
        count("BikeID").alias("rental_count"),
        avg("RentalDuration").alias("avg_duration")
    )

# Display results
display(aggregatedDF)

This calculates 5-minute rolling statistics for each station!

Writing Streaming Data to Lakehouse

Store your processed stream:

# Write to Delta table in lakehouse
query = aggregatedDF \
    .writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/lakehouse/default/checkpoints/rentals") \
    .toTable("rental_metrics")

# The stream runs continuously
# To stop it later, use: query.stop()

Real-Time Machine Learning

Apply ML models to streaming data:

from pyspark.ml.classification import LogisticRegressionModel

# Load your pre-trained model
model = LogisticRegressionModel.load("/lakehouse/default/models/rental_prediction")

# Apply to streaming data
predictionsDF = model.transform(parsedDF)

# Write predictions to database
query = predictionsDF \
    .writeStream \
    .format("delta") \
    .outputMode("append") \
    .toTable("rental_predictions")

This scores every incoming rental event in real-time!

Monitoring Your Streams

Check stream health:

# Get stream status
print("Stream is active:", query.isActive)
print("Recent progress:")
print(query.recentProgress)

# Get statistics
query.status

Example: E-Commerce Analytics

Let me walk you through a complete example:

Scenario: Online store wants real-time sales monitoring

Sources:

  • Order events from website

  • Inventory updates from warehouse

  • Customer behavior from mobile app

Event Stream Setup:

  1. Three sources feeding one event stream

  2. Filters remove test orders and bots

  3. Join operations enrich orders with product details

  4. Aggregations create 5-minute sales summaries

Destinations:

KQL Database "SalesRealTime":

  • Stores last 90 days for real-time queries

  • Powers live dashboards

  • 30-day caching for performance

Lakehouse "SalesHistory":

  • Stores all historical data

  • Used for annual reports and ML

  • Archived after 7 years

Power BI Dashboard shows:

  • Revenue per minute (line chart)

  • Top products (bar chart)

  • Geographic sales (map)

  • Inventory alerts (cards)

  • Customer segments (pie chart)

Notebooks provide:

  • Demand forecasting

  • Fraud detection

  • Customer lifetime value prediction

  • Automated anomaly detection

Result: Business sees sales trends instantly, predicts demand accurately, and responds to problems in minutes instead of hours!

Event Stream Not Receiving Data

Check:

  1. Is the source connection valid? (Look for green checkmark)

  2. Are credentials correct?

  3. Is the source actually sending data?

  4. Check the "Data preview" tab on source node

Solution:

  • Refresh the connection

  • Verify source is active

  • Check Azure Event Hub metrics

Data Not Appearing in KQL Database

Check:

  1. Is the event stream published?

  2. Is ingestion activated?

  3. Are there transformation errors?

  4. Check table permissions

Solution:

// Check ingestion failures
.show ingestion failures

Check:

  1. Is DirectQuery enabled?

  2. Is auto-refresh configured?

  3. Are there query timeout errors?

Solution:

  • Simplify complex queries

  • Adjust refresh interval

  • Check database performance

Notebook Stream Stops

Check:

  1. Is checkpoint location accessible?

  2. Are there schema mismatches?

  3. Is cluster still running?

Solution:

# Restart with new checkpoint
query = df.writeStream \
    .option("checkpointLocation", "/new/checkpoint/path") \
    .start()

Enoyed this article? Share with your dev/data friends

  • Got questions or want to share what you've built?

  • Drop a comment below. I read every single one and love seeing what you create with these tools!