Skip to content

Lab 051: Fabric IQ β€” Real-Time Intelligence AgentsΒΆ

Level: L300 Path: All paths Time: ~75 min πŸ’° Cost: Free β€” Uses included mock dataset (Fabric capacity optional)

What You'll LearnΒΆ

  • How Eventstreams ingest real-time IoT sensor data into Microsoft Fabric
  • Query streaming data with KQL (Kusto Query Language) for instant anomaly detection
  • Detect temperature, humidity, and stock anomalies using threshold-based rules
  • Use Fabric Activator to trigger automated alerts when conditions are met
  • Build an AI agent that reads real-time data and surfaces actionable insights
  • Analyze warehouse activity patterns (door opens, sensor frequency) across locations

IntroductionΒΆ

Fabric RTI Pipeline

Real-Time Intelligence (RTI) in Microsoft Fabric is a fully managed platform for capturing, transforming, and acting on streaming data β€” all within the Fabric workspace. Unlike traditional batch pipelines that process data hours or days after it arrives, RTI gives you sub-second visibility into what's happening right now.

The ScenarioΒΆ

OutdoorGear Inc. operates 5 warehouses across the US (New York, Los Angeles, Chicago, Dallas, and Seattle). Each warehouse is equipped with IoT sensors monitoring four key metrics:

Sensor Type What It Measures Critical Threshold
temperature Ambient temperature (Β°C) > 30Β°C (product damage risk)
humidity Relative humidity (%) > 80% (moisture damage risk)
stock_level Units remaining in bin < 10 units (reorder urgently)
door_opens Door open count per interval High activity = unusual traffic

An AI agent monitors these sensor feeds in real time, detects anomalies, and triggers alerts β€” so warehouse managers can act before products are damaged or stock runs out.


PrerequisitesΒΆ

Requirement Why
Python 3.10+ Run analysis scripts
pandas Analyze sensor event data
pip install pandas

Quick Start with GitHub Codespaces

Open in GitHub Codespaces

All dependencies are pre-installed in the devcontainer.

πŸ“¦ Supporting FilesΒΆ

Download these files before starting the lab

Save all files to a lab-051/ folder in your working directory.

File Description Download
broken_alerting.py Bug-fix exercise (3 bugs + self-tests) πŸ“₯ Download
sensor_events.csv Dataset πŸ“₯ Download

Step 1: Understanding Fabric Real-Time IntelligenceΒΆ

Before writing code, understand the four RTI components that form the end-to-end pipeline:

Component Role Analogy
Eventstream Managed real-time data ingestion pipeline β€” captures events from IoT hubs, Kafka, or custom apps The conveyor belt that brings data in
Eventhouse Columnar time-series database optimized for streaming data; stores events for querying The warehouse where data is stored
KQL (Kusto Query Language) Query language for filtering, aggregating, and analyzing time-series data at scale The SQL of real-time analytics
Activator Rule engine that triggers automated actions (emails, Teams messages, Power Automate flows) when data conditions are met The alarm system that acts on anomalies

How They Work TogetherΒΆ

IoT Sensors β†’ Eventstream β†’ Eventhouse β†’ KQL Queries β†’ Activator β†’ Alerts
                (ingest)     (store)      (analyze)     (act)
  1. Eventstream captures raw sensor events (JSON payloads) from IoT Hub or custom sources
  2. Events land in an Eventhouse table (SensorEvents) for fast columnar querying
  3. KQL queries run against the Eventhouse to detect anomalies in near-real-time
  4. Activator monitors KQL query results and fires alerts when conditions are met
  5. An AI agent can query the Eventhouse directly, correlate anomalies, and generate plain-language summaries for warehouse managers

This Lab Uses Mock Data

In production, events flow continuously from IoT Hub through Eventstreams. In this lab, we simulate the pipeline with a CSV snapshot of 50 sensor events and use pandas to demonstrate the KQL-equivalent queries. The logic maps 1:1 to production KQL.


Step 2: Load and Explore Sensor EventsΒΆ

The dataset has 50 sensor events from 5 warehouses covering all 4 sensor types:

import pandas as pd

events = pd.read_csv("lab-051/sensor_events.csv")
print(f"Total events:   {len(events)}")
print(f"Warehouses:     {events['warehouse_id'].nunique()}")
print(f"Sensor types:   {sorted(events['sensor_type'].unique())}")
print(f"\nEvents per warehouse:")
print(events["warehouse_id"].value_counts().sort_index())
print(f"\nEvents per sensor type:")
print(events["sensor_type"].value_counts().sort_index())

Expected output:

Total events:   50
Warehouses:     5
Sensor types:   ['door_opens', 'humidity', 'stock_level', 'temperature']

Events per warehouse:
WH-CHI    10
WH-DAL    10
WH-LAX    10
WH-NYC    10
WH-SEA    10

Events per sensor type:
door_opens     12
humidity       12
stock_level    13
temperature    13

Preview the DataΒΆ

print(events[["timestamp", "warehouse_id", "sensor_type", "value"]].head(10).to_string(index=False))

Each event has a timestamp, warehouse_id, sensor_type, and numeric value. This is the shape of data that an Eventstream would deliver into an Eventhouse table.


Step 3: KQL-Style Anomaly DetectionΒΆ

In production Fabric, you'd write KQL queries against the Eventhouse. Here we use pandas to replicate the same logic β€” every pandas filter maps directly to a KQL where clause.

3a. Temperature Anomalies (> 30Β°C)ΒΆ

KQL equivalent:

SensorEvents
| where sensor_type == "temperature"
| where value > 30
| project timestamp, warehouse_id, value
| order by value desc

Python equivalent:

temp = events[events["sensor_type"] == "temperature"]
temp_anomalies = temp[temp["value"] > 30].sort_values("value", ascending=False)
print(f"🌑️ Temperature anomalies (> 30°C): {len(temp_anomalies)}")
print(temp_anomalies[["timestamp", "warehouse_id", "value"]].to_string(index=False))

Expected output:

🌑️ Temperature anomalies (> 30°C): 3
          timestamp warehouse_id  value
 2026-06-15 14:20:00       WH-NYC   38.0
 2026-06-15 11:45:00       WH-DAL   35.0
 2026-06-15 09:30:00       WH-DAL   32.0

Critical Alert

WH-NYC at 38Β°C is dangerously high β€” perishable goods and electronics can be damaged above 35Β°C. An Activator rule would immediately notify the NYC warehouse manager and trigger the HVAC system.

3b. Stock Critical (< 10 Units)ΒΆ

KQL equivalent:

SensorEvents
| where sensor_type == "stock_level"
| where value < 10
| project timestamp, warehouse_id, value
| order by value asc

Python equivalent:

stock = events[events["sensor_type"] == "stock_level"]
stock_critical = stock[stock["value"] < 10].sort_values("value")
print(f"πŸ“¦ Stock critically low (< 10 units): {len(stock_critical)}")
print(stock_critical[["timestamp", "warehouse_id", "value"]].to_string(index=False))

Expected output:

πŸ“¦ Stock critically low (< 10 units): 2
          timestamp warehouse_id  value
 2026-06-15 13:00:00       WH-LAX    3.0
 2026-06-15 10:15:00       WH-LAX    8.0

Insight

Both critical stock events are at WH-LAX β€” stock dropped from 8 units to 3 units over a few hours. An AI agent would detect this trend and recommend an emergency restock before the warehouse runs out completely.

3c. Humidity Alerts (> 80%)ΒΆ

KQL equivalent:

SensorEvents
| where sensor_type == "humidity"
| where value > 80
| project timestamp, warehouse_id, value

Python equivalent:

humidity = events[events["sensor_type"] == "humidity"]
humidity_alerts = humidity[humidity["value"] > 80]
print(f"πŸ’§ Humidity alerts (> 80%): {len(humidity_alerts)}")
print(humidity_alerts[["timestamp", "warehouse_id", "value"]].to_string(index=False))

Expected output:

πŸ’§ Humidity alerts (> 80%): 1
          timestamp warehouse_id  value
 2026-06-15 15:10:00       WH-CHI   85.0

Step 4: Warehouse Activity AnalysisΒΆ

Analyze door open events per warehouse β€” unusually high activity may indicate security concerns or peak shipping periods:

doors = events[events["sensor_type"] == "door_opens"]
door_activity = doors.groupby("warehouse_id")["value"].sum().reset_index()
door_activity.columns = ["Warehouse", "Total Door Opens"]
door_activity = door_activity.sort_values("Total Door Opens", ascending=False)
print("πŸšͺ Door Activity by Warehouse:")
print(door_activity.to_string(index=False))
print(f"\nMost active: {door_activity.iloc[0]['Warehouse']} "
      f"({int(door_activity.iloc[0]['Total Door Opens'])} total door opens)")

Expected output:

πŸšͺ Door Activity by Warehouse:
 Warehouse  Total Door Opens
    WH-DAL              14.0
    WH-NYC              12.0
    WH-SEA              10.0
    WH-CHI               9.0
    WH-LAX               7.0

Most active: WH-DAL (14 total door opens)

Insight

WH-DAL leads with 14 total door opens β€” combined with its two temperature anomalies (32Β°C and 35Β°C), frequent door openings could be letting hot air in. An AI agent would correlate these signals: "Dallas warehouse has high door activity AND rising temperatures β€” consider adding an air curtain to loading dock 3."


Step 5: Build an Alert DashboardΒΆ

Combine all anomalies into a single summary dashboard that an AI agent would present to a warehouse operations manager:

temp_count = len(temp_anomalies)
stock_count = len(stock_critical)
humidity_count = len(humidity_alerts)
total_anomalies = temp_count + stock_count + humidity_count

# Affected warehouses
affected = set()
affected.update(temp_anomalies["warehouse_id"].tolist())
affected.update(stock_critical["warehouse_id"].tolist())
affected.update(humidity_alerts["warehouse_id"].tolist())

dashboard = f"""
╔══════════════════════════════════════════════════╗
β•‘       Fabric RTI β€” Anomaly Alert Dashboard       β•‘
╠══════════════════════════════════════════════════╣
β•‘ Total Events Analyzed:  {len(events):>5}                     β•‘
β•‘ Warehouses Monitored:   {events['warehouse_id'].nunique():>5}                     β•‘
β•‘ ─────────────────────────────────────────────── β•‘
β•‘ 🌑️  Temperature Alerts:  {temp_count:>5}  (> 30Β°C)            β•‘
β•‘ πŸ“¦ Stock Critical:       {stock_count:>5}  (< 10 units)        β•‘
β•‘ πŸ’§ Humidity Alerts:      {humidity_count:>5}  (> 80%)             β•‘
β•‘ ─────────────────────────────────────────────── β•‘
β•‘ ⚠️  Total Anomalies:     {total_anomalies:>5}                     β•‘
β•‘ 🏭 Warehouses Affected: {len(affected):>5}  ({', '.join(sorted(affected))})  β•‘
β•‘ πŸšͺ Most Active:         WH-DAL (14 door opens)  β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•

Priority Actions:
  1. πŸ”΄ WH-NYC: Temperature at 38Β°C β€” check HVAC immediately
  2. πŸ”΄ WH-LAX: Stock at 3 units β€” trigger emergency reorder
  3. 🟑 WH-DAL: Two temperature spikes + high door activity
  4. 🟑 WH-CHI: Humidity at 85% β€” activate dehumidifiers
"""
print(dashboard)

Expected output:

╔══════════════════════════════════════════════════╗
β•‘       Fabric RTI β€” Anomaly Alert Dashboard       β•‘
╠══════════════════════════════════════════════════╣
β•‘ Total Events Analyzed:     50                     β•‘
β•‘ Warehouses Monitored:       5                     β•‘
β•‘ ─────────────────────────────────────────────── β•‘
β•‘ 🌑️  Temperature Alerts:      3  (> 30Β°C)            β•‘
β•‘ πŸ“¦ Stock Critical:           2  (< 10 units)        β•‘
β•‘ πŸ’§ Humidity Alerts:          1  (> 80%)             β•‘
β•‘ ─────────────────────────────────────────────── β•‘
β•‘ ⚠️  Total Anomalies:         6                     β•‘
β•‘ 🏭 Warehouses Affected:     4  (WH-CHI, WH-DAL, WH-LAX, WH-NYC)  β•‘
β•‘ πŸšͺ Most Active:         WH-DAL (14 door opens)  β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•

AI Agent Integration

In production, an AI agent would query the Eventhouse via KQL, run this anomaly detection logic, and post a natural-language summary to a Teams channel or email. Fabric Activator handles the automated alerting, while the AI agent provides the interpretation β€” turning raw numbers into actionable recommendations.


πŸ› Bug-Fix ExerciseΒΆ

The file lab-051/broken_alerting.py has 3 bugs that produce incorrect anomaly detection results. Run the self-tests:

python lab-051/broken_alerting.py

You should see 3 failed tests:

Test What it checks Hint
Test 1 Temperature threshold is parameterized The threshold is hardcoded to 50 instead of using the threshold parameter
Test 2 Stock alert uses correct comparison Stock alerts trigger when value is below the threshold, not above
Test 3 Anomaly rate is calculated per warehouse The denominator should be events per warehouse, not total events across all warehouses

Fix all 3 bugs and re-run until you see πŸŽ‰ All 3 tests passed.


🧠 Knowledge Check¢

Q1 (Multiple Choice): What is an Eventstream in Microsoft Fabric?
  • A) A batch data processing pipeline that runs on a schedule
  • B) A managed real-time data ingestion pipeline that captures streaming events
  • C) A visualization tool for creating dashboards
  • D) A machine learning model training service
βœ… Reveal Answer

Correct: B) A managed real-time data ingestion pipeline that captures streaming events

An Eventstream is the entry point for real-time data in Fabric. It captures events from sources like IoT Hub, Kafka, or custom applications, transforms them in-flight, and routes them to destinations like an Eventhouse for querying. Unlike batch pipelines, Eventstreams process data continuously with sub-second latency.

Q2 (Multiple Choice): What does Fabric Activator do?
  • A) Optimizes KQL query performance
  • B) Manages Eventhouse storage capacity
  • C) Triggers automated actions when data conditions are met
  • D) Converts batch data into streaming format
βœ… Reveal Answer

Correct: C) Triggers automated actions when data conditions are met

Activator is Fabric's rule engine for real-time alerting. You define conditions (e.g., "temperature > 30Β°C") and actions (e.g., send a Teams notification, trigger a Power Automate flow). It continuously monitors KQL query results and fires alerts the moment a condition is met β€” no polling required.

Q3 (Run the Lab): How many temperature readings exceed 30Β°C?

Filter the events DataFrame for sensor_type == "temperature" and value > 30.

βœ… Reveal Answer

3

Three temperature anomalies exceed 30Β°C: WH-NYC at 38Β°C, WH-DAL at 35Β°C, and WH-DAL at 32Β°C. The NYC reading at 38Β°C is the most critical β€” well above the 35Β°C threshold for product damage.

Q4 (Run the Lab): Which warehouse has the most door_opens?

Filter for sensor_type == "door_opens", group by warehouse_id, and sum the values.

βœ… Reveal Answer

WH-DAL (14 total door opens)

Dallas leads with 14 total door opens, followed by WH-NYC (12), WH-SEA (10), WH-CHI (9), and WH-LAX (7). Combined with Dallas's two temperature anomalies, the high door activity may be contributing to heat buildup.

Q5 (Run the Lab): How many stock readings are critically low (< 10 units)?

Filter for sensor_type == "stock_level" and value < 10.

βœ… Reveal Answer

2

Two stock readings at WH-LAX are critically low: 8 units and 3 units. Both events are at the same warehouse, suggesting a rapid stock depletion trend that requires an emergency reorder.


SummaryΒΆ

Topic What You Learned
Eventstreams Real-time ingestion of IoT sensor data into Fabric
Eventhouse & KQL Columnar storage and query language for time-series analytics
Anomaly Detection Threshold-based alerts for temperature, humidity, and stock levels
Activator Automated actions triggered by data conditions
AI Agent Integration Agents query Eventhouse data and generate actionable recommendations
Alert Dashboard Combining multiple anomaly types into a unified operations view

Next StepsΒΆ

  • Lab 052 β€” Fabric RTI Advanced: Sliding-Window Aggregations & Trend Detection
  • Lab 053 β€” Building an AI Agent with Fabric Activator & Semantic Kernel