Automatická údržba delta tabulek ve Fabric Lakehouse (bez PySparku)
- Vojtěch Šíma
- Feb 15
- 10 min read
tl;dr Delta tabulky fungují ve zkratce tak, že při transakcích (zápisy, přepis řádků atd.) generují nové soubory (JSON, Parquet). Tím může vznikat velké množství souborů, které je nutné čistit. Pokud chcete ušetřit CU (Capacity Units) eliminací Spark Jobů, lze využít knihovnu delta-rs. Ta nabízí standardní metody jako optimize (compact, zorder), vacuum a další. Pro V-Order je aktuálně nezbytný Spark Job. Více detailů nalezneš v článku.
Jak delta tabulky fungují
Delta log (_delta_log)
Pro pochopení potřeb údržby je nutné porozumět principu fungování Delta tabulek. Klíčovou komponentou je transakční log (nebo také delta log, pojmy budu střídat). Delta log je složka obsahující seřazené logy ve formátu JSON, které popisují jednotlivé commity. Každá transakce (commit) tabulky vytvoří nový JSON soubor a přispívá k další verzi tabulky. Jedna transakce obvykle nepopisuje celou tabulku, pouze zaznamenává změny.
Jednotlivé soubory jsou JSONy s názvy doplněnými nulami; vypadají následovně:
00000000000000000600.jsonCheckpointy (*.checkpoint.parquet)
Pro efektivnější zpracování commit logů mají Delta tabulky mechanismus checkpointů (kontrolních bodů), který pomáhá čtečkám rychleji "dohnat" aktuální stav. Checkpoint je soubor Parquet ve složce delta logu, který konsoliduje stav transakčního logu k určité verzi. Čtečka tak nemusí přehrávat celý log od začátku. Původní dokumentace navrhuje checkpoint po 10 commitech, ale nespoléhej na to jako na pevné pravidlo. Snapshot lze vytvořit i na vyžádání.
00000000000000000599.checkpoint.parquetLatest checkpoint pointer (_last_checkpoint)
Protože může existovat mnoho checkpointů, existuje i soubor s ukazatelem nazvaný _last_checkpoint. Jedná se o malý JSON soubor odkazující na nejnovější checkpoint, takže čtečky mohou skočit přímo na něj namísto skenování rozsáhlého adresáře _delta_log.
Poznámka: Tento soubor technicky nemá příponu, ale čte se jako JSON.
Představ si poslední checkpoint doslova jako "uložení hry" (game save). Při čtení tabulky se "objevíš" u posledního checkpointu. Jakýkoli postup poté (novější než checkpoint) je efektivně "neuložený postup", který musí engine přečíst a přehrát pro získání aktuálního stavu. Ve zkratce: pokud je checkpoint verze 850, začínáš tam. Ale pro zobrazení aktuálního stavu musí engine přečíst i každý JSON nad 850 (851, 852 atd.) a aplikovat tyto změny.
_last_checkpointDatové soubory Parquet
Druhou komponentou Delta tabulek jsou samotná data. Data se zapisují jako soubory Parquet. Parquet je neměnný (immutable), soubory se tedy nepřepisují, ale píší se nové. Například append přidá kompletně nový soubor; Update mrkne na řádek, který opravuješ, a vytvoří nový soubor se změněným řádkem plus zbytek z originálního souboru.
7246655d-957d-4c83-aab5-cb9a2a21ce1a.parquetpart-00000-9800ad29-a568-4a9c-9273-df62e566c7b4-c000.snappy.parquetPartitions (oddíl)
Pokud data využívají partitiony, struktura složek Delta tabulky se rozšíří tak, že každá unikátní hodnota partition sloupce získá vlastní složku ve tvaru partition_column=column_value a soubory Parquet pro tuto hodnotu leží uvnitř. Při více partition sloupcích jsou složky vnořené (např. year=2026/month=02/day=12). Jde pouze o fyzické rozložení na úložišti, které pomáhá dotazům přeskakovat nerelevantní data (partition pruning), ale mnoho unikátních hodnot může vytvořit velké množství složek a souborů.
Ačkoliv by údržba technicky měla validovat i partitiony, tento příspěvek se tímto nezabývá. Obecně (a velmi zjednodušeně): pokud potenciální partition bude mít alespoň 1 GB dat, použij ji, jinak ne.
date=2025-03-05Jak Delta tabulky čte (nejnovější) verze
Logika sestavení dotazovatelné verze tabulky je teoreticky prostá. Scénář: Máš transakci, která přidá tři řádky – zapíšeš Parquet soubor s těmito řádky. Později přidáš další řádek – zapíšeš opět nový Parquet soubor. Následně, pokud transakce modifikuje existující řádky, vezmou se soubory obsahující tyto řádky, zapíší se nové soubory s aktualizovanými řádky a starší nezměněné řádky z původních souborů se "svezou" s nimi.
Vizuálně to může vypadat takto:
Transaction | Operation | Delta Log Action | Physical File Content | Current Active Files (Snapshot) |
1 | Initial Write | add(File 1) | File 1: {row1, row2, row3} | File 1 |
2 | Append Data | add(File 2) | File 2: {row4} | File 1, File 2 |
3 | Update row1 | remove(File 1) add(File 3) | File 3: {row1*, row2, row3} (row2 & row3 "tag along") | File 2, File 3 |
4 | Update row3 | remove(File 3) add(File 4) | File 4: {row1*, row2, row3*} (row1 & row2 "tag along") | File 2, File 4 |
Některé implementace Delty podporují tzv. deletion vectors, které mohou zaznamenat smazání a některé aktualizace bez okamžitého přepsání celého Parquet souboru. Tabulka se z pohledu dotazu chová stejně, ale fyzický přepis se odloží na pozdější operaci 'zmenšování/zjednodušení' (compaction) nebo údržbu.
Rychlé signály před údržbou
Před provedením optimalizace nebo údržby je vhodné znát "zdravotní" stav Delta tabulky v rámci souborů: kolik aktivních Parquet souborů tvoří poslední verzi, zda jsou soubory malé nebo přiměřeně velké a zda existují staré nereferencované soubory čekající na úklid. I když údržbu pravděpodobně provedeš tak jako tak, kontrola zdraví a vlivu transakcí v čase je užitečná. Automatizace kontroly není součástí tohoto článku, ale zde jsou nástroje pro její provedení (věřím, že to zvládneš).
Vizuální kontrola
Pro prvotní pohled na strukturu složek přejdi do Lakehouse, rozbal Tables, vyber si jednu a klikni na ni, poté vyber Files view. Doporučuji vizuální check udělat, pokud jsi strukturu ještě nikdy neviděl.
Pokud používáš OneLake file explorer ve Windows, můžeš soubory procházet i tam.
Delta-rs
Jelikož se tento článek zaměřuje na operace bez Sparku, využijeme Delta-rs prostřednictvím Python balíčku deltalake. Jde o extrémně výkonný nástroj pro operace s Delta tabulkami; pro tento scénář využijeme data class DeltaTable.
Get add actions
Dobrý výchozí bod pro kontrolu zdraví. Výsledkem této metody je seznam všech souborů s akcí "add", které přispívají k aktuální verzi. U malých dat může jít o jeden konsolidovaný Parquet soubor. Pokud zde vidíš stovky či tisíce záznamů, měl bys na to hodit očko -> pravděpodobně je nutná údržba.
V Pythonu – spusť s připojeným Lakehousem:
from deltalake import DeltaTable
workspace_id = 'workspace_guid'
lakehouse_id = 'lakehouse_guid'
table_name = 'your_delta_table_name'
dt = DeltaTable(f'abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Tables/{table_name}')
display(dt.get_add_actions())Pro hezčí výsledek přidej vestavěné to_pandas:
# previous config
display(dt.get_add_actions().to_pandas())History
Pro zobrazení celé historie operací využij metodu history(). Ta vypíše operace jako write, optimize a vacuum včetně tagů a parametrů. Pravidlo palce: vidíš-li mnoho zápisů (writes), ale žádné vacuum nebo optimize, proveď údržbu.
V Pythonu – spusť s připojeným Lakehousem:
from deltalake import DeltaTable
workspace_id = 'workspace_guid'
lakehouse_id = 'lakehouse_guid'
table_name = 'your_delta_table_name'
dt = DeltaTable(f'abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Tables/{table_name}')
display(dt.history())Dry vacuum
Vacuum je operace odstraňující nereferencované datové soubory starší než retenční práh. Ve Fabricu je výchozí retence souborů 7 dní v OneLake, soubory tedy musí být starší než tato doba, aby byly způsobilé ke smazání.
Pokud chceš nejprve vidět, co lze smazat, spusť metodu dry_vacuum.
V Pythonu – spusť s připojeným Lakehousem:
from deltalake import DeltaTable
workspace_id = 'workspace_guid'
lakehouse_id = 'lakehouse_guid'
table_name = 'your_delta_table_name'
dt = DeltaTable(f'abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Tables/{table_name}')
display(dt.vacuum(dry_run=True))Výše uvedený kód používá výchozí nastavení retenční doby. Pro plnou kontrolu použij následující:
# previous config
display(dt.vacuum(dry_run=True, retention_hours=0, enforce_retention_duration=False))Vždy dvakrát zkontroluj, že dry_run = True, jinak soubory skutečně smažeš.
Poznámka: VACUUM neodstraňuje transakční logy; proto je dry run nevypíše. Pro identifikaci logů k smazání je nutné filtrovat historii tabulky (history()) na položky starší než retenční doba logů. Logy se mažou automaticky po alespoň jednom checkpointu a překročení retenční doby.
Pro zobrazení konfigurace tabulky a nastavení retence použij:
# previous config
display(dt.metadata().configuration)Pokud je výsledek prázdný, tabulka používá výchozí nastavení: 30 dní pro logy a 7 dní pro soubory Parquet.
Alternativa: Audit fyzického úložiště
Iterování složek za účelem počítání souborů je pro kontrolu zdraví "anti-pattern". Hrubý počet je zavádějící, protože míchá aktivní data se zbytky (tombstoned files) a na tabulkách s partitions bývá pomalý. Stav Delty je definován transakčním logem, takže počty souborů ve složce mohou být šum.
Fyzická iterace je užitečná pro audit úložiště, protože ukazuje plnou stopu složky tabulky, včetně logů a smazaných souborů, které existují do spuštění VACUUM. Stále však neříká, co je bezpečné smazat. Pro akční signály spoléhej na metody transakčního logu (history(), get_add_actions()) a vacuum(dry_run=True).
Nástroje údržby Delta tabulek ve Fabric Lakehouse
Jak bylo zmíněno, zaměřuji se na Python notebooky mimo PySpark nebo SparkSQL. Proto se zaměříme na možnosti Delta-RS, jelikož umožňují provádět všechny povinné operace údržby bez šahání na Spark.
S tímto API nelze provést V-Order, ten lze aktuálně provést pouze přes Spark. Níže vysvětlím, co V-Order je a jak jej technicky spustit přes Fabric REST API, pokud je potřeba.
Než přejdeme k samotné implementaci (která bude velmi rychlá), vysvětlím nástroje, které budeme používat.
OPTIMIZE
Optimize je klíčová funkce Delta tabulek umožňující sloučení malých souborů do větších (bin-packing). Pokud máš mnoho dat, OPTIMIZE efektivně distribuuje soubory do správně velkých Parquet souborů. To výrazně prospívá výkonu následného čtení. Maximální velikost bloku lze konfigurovat; výchozí hodnota pro Spark Delta Lake je 1 GB. Pro delta-rs je cílová velikost souboru 256 MB.
Ve Fabricu lze proces optimalizace mírně upravit pomocí několika variant:
Z-Order
Z-ordering je technika pro umístění souvisejících informací do stejné sady souborů, což umožňuje enginu přeskakovat více souborů při čtení. Delta sbírá statistiky pro "data skipping" (min a max hodnoty) a Z-order zlepšuje užitečnost těchto statistik uspořádáním fyzického rozložení souborů podle sloupců, na které se často dotazuješ.
Je to skvělé, pokud víš, které sloupce uživatelé často filtrují. Přidání příliš mnoha sloupců obvykle znamená více práce při optimalizaci a klesající přínos, takže buď rozvážný.
V-Order
V-Order je optimalizace Parquetu při zápisu používaná v Microsoft Fabric. Microsoft ji popisuje jako třídění, distribuci row group, (en)kódování a kompresi, které zlepšují výkon čtení napříč enginy Fabricu.
Tvrzení Microsoftu: průměrně ~15% overhead při zápisu, až o 50% lepší komprese a zlepšení času čtení (Spark a jiné non-Verti-Scan enginy průměrně ~10% rychlejší čtení, v některých scénářích až 50%).
V nových workspacích bývá ve výchozím stavu vypnuto a doporučuje se hlavně pro scénáře náročné na čtení. Lze aplikovat i zpětně přepsáním souborů během optimalizace.
Jak bylo řečeno, V-Order nelze aktuálně provést přes delta-rs. Pokud jej potřebuješ, ukážu, jak na to později.
VACUUM
VACUUM čistí soubory, na které již neexistuje reference v aktuálním logu Delta tabulky. To je důležité, protože OPTIMIZE přepisuje soubory a staré ponechává, dokud je VACUUM po uplynutí retenčního prahu nesmaže.
Ve Fabricu je výchozí retenční práh souborů sedm dní. Nastavení kratší retence ovlivňuje Time Travel a může být riskantní při souběžných čtenářích a zapisovačích. Fabric také uvádí, že UI a veřejná API ve výchozím nastavení selžou pro retenční intervaly pod 7 dní, pokud nezakážeš kontrolu trvání retence.
Implementace nástrojů údržby s Delta-rs
Podívejme se na implementaci nástrojů údržby v Python notebooku. Rychlá poznámka před akcí: příkazy spouštěj mimo jiné změny, které by se mohly dít na Delta tabulkách. Některé metody mohou selhat, pokud dojde ke konfliktu s jinými operacemi údržby (zejména těmi, které odstraňují nebo přepisují soubory).
Pořadí těchto kroků lze také brát jako pořadí, které bys měl replikovat ve své pipeline.
Pokud jsi přeskočil přímo sem, doporučuji si nejprve přečíst o jednotlivých částech výše.
Pro každou metodu najdeš hloubkové vysvětlení a detaily parametrů v dokumentaci.
OPTIMIZE & Z-ORDER
Pro optimalizaci a bin-packing souborů využijeme metodu compact(). Samotný Optimize nemaže soubory, pouze vytváří nové a staré označí jako odstraněné, proto tento krok sám o sobě nestačí (viz Vacuum).
Pokud tato operace probíhá souběžně s jinými operacemi než append, selže.
Rozhodneš-li se, že tabulka profituje ze Z-orderingu, nespouštěj optimize dvakrát. Z-order je nadmnožinou compact, takže buď spusť samotný compact, nebo (chceš-li Z-order) pouze Z-order.
compact(partition_filters=None, target_size=None, max_concurrent_tasks=None, min_commit_interval=None)Extra tipy:
Když je zadáno partition_filters, optimalizuje se pouze daná partition nebo partitiony.
target_size je v bytech. 256 MB odpovídá 256*1024*1024 (nebo taky 2**8 * 1024**2).
max_concurrent_tasks má jako výchozí hodnotu počet CPU.
Spuštění compact() (bez nových změn) dvakrát neudělá nic.
Spuštění s výchozím nastavením:
from deltalake import DeltaTable
workspace_id = 'workspace_guid'
lakehouse_id = 'lakehouse_guid'
table_name = 'your_delta_table_name'
dt = DeltaTable(f'abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Tables/{table_name}')
dt.optimize.compact()Pokud chceš přidat Z-order, definuj sloupce, se kterými má běžet:
z_order(columns, partition_filters=None, target_size=None, max_concurrent_tasks=None, max_spill_size=21474836480, min_commit_interval=None)Spuštění s výchozím nastavením a sloupcem 'timestamp':
from deltalake import DeltaTable
workspace_id = 'workspace_guid'
lakehouse_id = 'lakehouse_guid'
table_name = 'your_delta_table_name'
dt = DeltaTable(f'abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Tables/{table_name}')
dt.optimize.z_order(["timestamp"])Praktická poznámka: jakmile použiješ Z-order, nelze se snadno vrátit zpět. Buď přestaneš Z-order spouštět pro další běhy, nebo pokud se chceš rozložení zcela zbavit, musíš přepsat celou tabulku či zvolit jiný Z-order (protože Z-order fyzicky přepisuje datové soubory).
VACUUM
Po zjednodušení souborů je třeba odstranit staré "odpadní" soubory. Jak víme, samotný optimize nic z úložiště nemaže, pouze zapíše nové kompaktní soubory a staré označí jako odstraněné. Staré soubory tam zůstávají, dokud je Vacuum nesmaže.
Důležitá poznámka: pokud zjednodušíš data dnes, retenční okno pro odstraněné soubory začíná okamžikem, kdy byly logicky odstraněny (tombstoned) tímto commitem optimalizace. Proto spuštění Vacuum ihned poté pravděpodobně nesmaže právě demotované soubory z aktuálního běhu, pokud nevynutíš potlačení retence.
Jelikož však toto děláš periodicky, je spuštění Vacuum stále velmi důležité, protože odstraní soubory z předchozího běhu (např. před týdnem).
Pokud nepotřebuješ Time Travel (návrat ke starší verzi Delta tabulky), můžeš nastavit retenční periodu na 0 a vyčistit vše ihned, ale nedoporučuje se to. Může to rozbít Time Travel i dlouho běžící čtečky. Používej s extrémní opatrností.
Spuštění vacuum s ostatními parametry jako default:
from deltalake import DeltaTable
workspace_id = 'workspace_guid'
lakehouse_id = 'lakehouse_guid'
table_name = 'your_delta_table_name'
dt = DeltaTable(f'abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Tables/{table_name}')
dt.vacuum(dry_run=False)Nedoporučený kompletní úklid:
# previous config
dt.vacuum(retention_hours=0, enforce_retention_duration=False, dry_run=False)
Úklid logů
Technicky lze čistit i logy. Děje se to automaticky po operacích checkpointu, ale načasování checkpointů může být napříč prostředími nekonzistentní, takže to lze řešit i ručně.
Výchozí retenční doba pro logy je 30 dní. Po checkpointingu jsou starší záznamy logu způsobilé k vyčištění na základě delta.logRetentionDuration. Snížením této retence omezuješ i to, jak daleko do historie se lze vrátit pomocí Time Travel.
Postup pro úklid logů může vypadat takto:
Volitelně: nastavit nové delta.logRetentionDuration
Vytvořit checkpoint
Vyčistit metadata
Zde je běh s novou retenční periodou nastavenou na 14 dní:
from deltalake import DeltaTable
workspace_id = 'workspace_guid'
lakehouse_id = 'lakehouse_guid'
table_name = 'your_delta_table_name'
dt = DeltaTable(f'abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Tables/{table_name}')
dt.alter.set_table_properties({"delta.logRetentionDuration": "interval 14 days"})
dt.create_checkpoint()
dt.cleanup_metadata()Pro kontrolu vlastních konfigurací (jako změněná retenční doba) spusť toto (pokud je prázdné, používáš defaulty):
# previous config
dt.metadata().configurationJeště jedna praktická poznámka: existují zprávy, že v některých verzích nebo scénářích se smíšenými zapisovači nebyla vlastní delta.logRetentionDuration respektována přesně dle očekávání, takže to ověř na nekritické kopii tabulky.
Údržba V-Order pro Delta tabulky ve Fabric Lakehouse
Jak bylo zmíněno, v době psaní článku není V-Order přítomen v nástrojích údržby delta-rs. Microsoft také nenabízí metody, jak jej aplikovat bez Sparku.
Existuje Fabric REST API, které lze technicky volat bez Sparku, ale na pozadí se stejně jedná o Spark job.
Osobně jsem V-Order zatím nebenchmarkoval, takže nemohu hovořit s patřičnou jistotou. Ostatní, kteří benchmarky dělali, však poukazují na to, že V-Order rozhodně není funkce typu "vždy zapnuto". V-Order má overhead při zápisu, tedy i nákladový overhead, s benefitem rychlejšího čtení a teoreticky levnějších čtení.
Pro scénáře vyžadující připojení Direct Lake nebo v případě Warehouse s partitions může být V-Order přínosem.
Kontrola OPTIMIZE s V-Order (a Z-Order) přes Spark SQL
V souvislosti s údržbou delta tabulek můžeš napsat následující Spark SQL pro řízení optimalizace s V-Order a Z-Order (mohou fungovat společně). Tyto formáty příkazů dokumentuje Microsoft Fabric:
%%sql
OPTIMIZE <table|fileOrFolderPath> VORDER;
OPTIMIZE <table|fileOrFolderPath> WHERE <predicate> VORDER;
OPTIMIZE <table|fileOrFolderPath> WHERE <predicate> [ZORDER BY (col_name1, col_name2, ...)] VORDER;Když se použijí ZORDER a VORDER společně, Apache Spark provede sekvenčně bin-compaction, ZORDER a VORDER.
U tabulek s partitions můžeš také mrknout na "Optimize write". To by mělo pomoci zapisovat optimální partitiony (velikost). Více o tom zde.
Alternativa pro údržbu (Preview)
Pokud chceš zkusit něco nového (stále v preview), můžeš využít Fabric REST API pro údržbu tabulek (Table Maintenance), které pomůže provést vše v jednom volání.
Toto API je explicitně označeno jako preview a nedoporučuje se pro produkční použití. Je ale dobré vědět o jeho existenci.
Tato operace spustí job na pozadí a umožní optimalizaci s V-Order, Z-Order a vacuum s volitelným intervalem (retenční periodou).
Lze spustit v Python notebooku. Základní "chatty" kód s ošetřením stavů může vypadat takto:
import requests
import json
import time
from datetime import datetime
token = notebookutils.credentials.getToken("pbi")
workspace_id = "<workspace_id>"
lakehouse_id = "<lakehouse_id>"
table_name = "<table_name>"
schema = "dbo" #dbo is default
shared_headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
payload = {
"executionData": {
"tableName": table_name,
"schemaName": schema,
"optimizeSettings": {
"vOrder": True
},
"vacuumSettings": {
"retentionPeriod": "7:01:00:00"
}
}
}
def run_maintenance_request():
return requests.post(
f"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/jobs/TableMaintenance/instances",
headers=shared_headers,
json=payload
)
while True:
response = run_maintenance_request()
if response.status_code == 429:
print("Too Many Requests")
retry_after = int(response.headers.get("Retry-After", 60))
time.sleep(retry_after)
continue
if response.status_code == 202:
print("Submitted. Looping for updates.")
status_url = response.headers.get("Location")
retry_after = int(response.headers.get("Retry-After", 60))
while True:
time.sleep(retry_after)
status_resp = requests.get(status_url, headers=shared_headers)
status_data = status_resp.json()
state = status_data.get("status")
print(f"Current Status: {state} - {datetime.utcnow()}")
if state in ["Completed", "Failed", "Cancelled", "Deduped"]:
if state == "Failed":
print("Error Details:", status_data.get("failureReason", "No details provided"))
break
break
print(f"Failed to submit: {response.text}")



Comments