Verversen van SQL Endpoint metadata in Microsoft Fabric
Waarom is dit nodig?
Microsoft Fabric (lees meer hierover in Wat is Microsoft Fabric? Een Complete Uitleg en Voordelen) biedt een krachtige manier om data op te slaan en te analyseren via een Lakehouse. Een belangrijk onderdeel hiervan is het SQL Endpoint, dat automatisch een schema genereert op basis van de onderliggende Lakehouse-tabellen. Echter, zoals beschreven in dit artikel, kan er vertraging optreden in de verversing van de SQL Endpoint metadata. Dit betekent dat nieuwe tabellen en wijzigingen niet direct beschikbaar zijn in het SQL Endpoint, wat problemen kan veroorzaken bij analyses en rapportages. Om de data kwaliteit te kunnen waarborgen tussen de verschillende lagen van de medaillion architectuur kan dit script je helpen (lees meer over deze manier van data structureren De Medallion Architectuur in Microsoft Fabric: Structuur en Datastromen.)
De Oplossing
Totdat Microsoft een officiële fix uitrolt of een expliciet metadata-ververs endpoint introduceert, kunnen we een tijdelijk script inzetten. Dit script ververst de metadata van het Lakehouse met het SQL Endpoint, waardoor schema-aanpassingen sneller worden verwerkt en zichtbaar zijn.
Wanneer het script starten?
Het is verstandig om dit script uit te voeren voordat je afhankelijk bent van het SQL Endpoint voor verdere stappen. Denk hierbij aan:
- Het verversen van semantische modellen.
- Het uitvoeren van SQL-statements binnen het datawarehouse.
- Het draaien van stored procedures.
Door eerst de metadata van het Lakehouse te synchroniseren, minimaliseer je de kans dat queries mislukken. Dit voorkomt dat tabellen of schema-aanpassingen nog niet zichtbaar zijn of dat de meest recente data ontbreekt.
Hoe werkt het verversen van het SQL endpoint?
Het script voert de volgende stappen uit:
- Optioneel: Het creëert testdata om de synchronisatie te valideren.
- Ophalen van SQL Endpoint ID: Het script haalt de ID van het SQL Endpoint op via de Fabric API.
- Uitvoeren van een Metadata Refresh: Het verstuurt een API-call om het SQL Endpoint te dwingen een metadata-verversing uit te voeren.
- Polling voor voltooiing: Het script checkt periodiek of de synchronisatie is voltooid.
- Resultaten tonen: Bij een succesvolle synchronisatie worden de statusdetails van de tabellen weergegeven. Bij een foutmelding worden de details gelogd.
Hieronder het volledige pyspark script:
import time
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp
from pyspark.sql.types import IntegerType, StructType, StructField, TimestampType
import sempy.fabric as fabric
from sempy.fabric.exceptions import FabricHTTPException, WorkspaceNotFoundException
def sync_lakehouse_with_sql(tenant_id, workspace_id, lakehouse_id, create_test_tables=False, refresh_metadata=False):
"""
Synchroniseert een Fabric Lakehouse met het SQL Endpoint.
"""
try:
if create_test_tables:
create_test_data()
if refresh_metadata:
client = fabric.FabricRestClient()
response = client.get(f"/v1/workspaces/{workspace_id}/lakehouses/{lakehouse_id}")
lakehouse_info = response.json()
sql_endpoint_id = lakehouse_info['properties']['sqlEndpointProperties']['id']
uri = f"/v1.0/myorg/lhdatamarts/{sql_endpoint_id}"
payload = {"commands": [{"$type": "MetadataRefreshExternalCommand"}]}
response = client.post(uri, json=payload)
response_data = response.json()
batch_id = response_data.get("batchId")
progress_state = response_data.get("progressState")
status_uri = f"/v1.0/myorg/lhdatamarts/{sql_endpoint_id}/batches/{batch_id}"
while progress_state == 'inProgress':
time.sleep(1)
status_response = client.get(status_uri).json()
progress_state = status_response.get("progressState")
print(f"Sync state: {progress_state}")
if progress_state == 'success':
return "Synchronisatie succesvol"
elif progress_state == 'failure':
print("Synchronisatie mislukt:", json.dumps(status_response, indent=4))
return status_response
except (FabricHTTPException, WorkspaceNotFoundException) as e:
print(f"Fout: {e}")
return {"error": str(e)}
def create_test_data():
"""
Creëert testtabellen in PySpark om de synchronisatie te valideren.
"""
spark = SparkSession.builder.appName("FabricSyncTest").getOrCreate()
schema = StructType([
StructField("id", IntegerType(), False),
StructField("timestamp", TimestampType(), False)
])
df = spark.createDataFrame([(i,) for i in range(1, 1001)], StructType([StructField("id", IntegerType(), False)]))
df = df.withColumn("timestamp", current_timestamp())
spark.sql("DROP TABLE IF EXISTS test1")
df.write.mode("overwrite").saveAsTable("test1")
df_dup = spark.table("test1")
df_dup.write.mode("overwrite").save("Tables/Test2")
print("Testtabellen 'test1' en 'Test2' aangemaakt.")
Conclusie: ververs SQL Endpoint metadata nu
Dit script biedt een tijdelijke oplossing voor vertraagde schema-updates in het SQL Endpoint van Fabric Lakehouse. Microsoft is op de hoogte van dit probleem en werkt mogelijk aan een definitieve oplossing. Tot die tijd helpt deze workaround om consistentie in rapportages en analyses te garanderen.
Hulp nodig bij de implementatie van dit script, of verbeteringen? Neem gerust contact op.