Introduction
a steady variable for 4 totally different merchandise. The machine studying pipeline was inbuilt Databricks and there are two main parts.
- Function preparation in SQL with serverless compute.
- Inference on an ensemble of a number of hundred fashions utilizing job clusters to have management over compute energy.
In our first try, a 420-core cluster spent almost 10 hours processing simply 18 partitions.
The target is to tune the information circulate to maximise cluster utilization and guarantee scalability. Inference is completed on 4 units of ML fashions, one set per product. Nevertheless, we are going to deal with how the information is saved as it can lay out how a lot parallelism we will leverage for inference. We is not going to deal with the interior workings of the inference itself.
If there are too few file partitions, the cluster will take a very long time scanning giant recordsdata and at that time, until repartitioned (meaning added community latency and knowledge shuffling), you is likely to be inferencing on a big set of rows in each partition too. Additionally leading to long term occasions.

Nevertheless, enterprise has restricted endurance to ship out ML pipelines with a direct influence on the org. So checks are restricted.
On this article, we are going to overview our characteristic knowledge panorama, then present an summary of the ML inference, and current the outcomes and discussions of the inference efficiency based mostly on 4 dataset remedy situations:
- Partitioned desk, no salt, no row restrict in partitions (non-salted and Partitioned)
- Partitioned desk, salted, with 1M row restrict (salty and Partitioned)
- Liquid-clustered desk, no salt, no row restrict in partitions (non-salted and Liquid)
- Liquid-clustered desk, salted, with 1M row restrict (salty and liquid)
Information Panorama
The dataset incorporates options that the set of ML fashions makes use of for inference. It has ~550M rows and incorporates 4 merchandise recognized within the attribute ProductLine:
- Product A: ~10.45M (1.9%)
- Product B: ~4.4M (0.8%)
- Product C: ~100M (17.6%)
- Product D: ~354M (79.7%)
It then has one other low cardinality attribute attrB, that incorporates solely two distinct values and is used as a filter to extract subsets of the dataset for each a part of the ML system.
Furthermore, RunDate logs the date when the options had been generated. They’re append-only. Lastly, the dataset is learn utilizing the next question:
SELECT
Id,
ProductLine,
AttrB,
AttrC,
RunDate,
{model_features}
FROM
catalog.schema.FeatureStore
WHERE
ProductLine = :product AND
AttrB = :attributeB AND
RunDate = :RunDate
Salt Implementation
The salting right here is generated dynamically. Its function is to distribute the information in keeping with the volumes. Because of this giant merchandise obtain extra buckets and smaller merchandise obtain fewer buckets. As an illustration, Product D ought to obtain round 80% of the buckets, given the proportions within the knowledge panorama.
We do that so we will have predictable inference run occasions and maximize cluster utilization.
# Calculate share of every (ProductLine, AttrB) based mostly on row counts
brand_cat_counts = df_demand_price_grid_load.groupBy(
"ProductLine", "AttrB"
).rely()
total_count = df_demand_price_grid_load.rely()
brand_cat_percents = brand_cat_counts.withColumn(
"p.c", F.col("rely") / F.lit(total_count)
)
# Gather percentages as dicts with string keys (this may later decide
# the variety of salt buckets every product receives
brand_cat_percent_dict = {
f"{row['ProductLine']}|{row['AttrB']}": row['percent']
for row in brand_cat_percents.gather()
}
# Gather counts as dicts with string keys (this may assist
# so as to add a further bucket if counts will not be divisible by the variety of
# buckets for the product
brand_cat_count_dict = {
f"{row['ProductLine']}|{row['AttrB']}": row['count']
for row in brand_cat_percents.gather()
}
# Helper to flatten key-value pairs for create_map
def dict_to_map_expr(d):
expr = []
for ok, v in d.objects():
expr.append(F.lit(ok))
expr.append(F.lit(v))
return expr
percent_case = F.create_map(*dict_to_map_expr(brand_cat_percent_dict))
count_case = F.create_map(*dict_to_map_expr(brand_cat_count_dict))
# Add string key column in pyspark
df_demand_price_grid_load = df_demand_price_grid_load.withColumn(
"product_cat_key",
F.concat_ws("|", F.col("ProductLine"), F.col("AttrB"))
)
df_demand_price_grid_load = df_demand_price_grid_load.withColumn(
"p.c", percent_case.getItem(F.col("product_cat_key"))
).withColumn(
"product_count", count_case.getItem(F.col("product_cat_key"))
)
# Set min/max buckets
min_buckets = 10
max_buckets = 1160
# Calculate buckets per row based mostly on (BrandName, price_delta_cat) share
df_demand_price_grid_load = df_demand_price_grid_load.withColumn(
"buckets_base",
(F.lit(min_buckets) + (F.col("p.c") * (max_buckets - min_buckets))).solid("int")
)
# Add an additional bucket if brand_count will not be divisible by buckets_base
df_demand_price_grid_load = df_demand_price_grid_load.withColumn(
"buckets",
F.when(
(F.col("product_count") % F.col("buckets_base")) != 0,
F.col("buckets_base") + 1
).in any other case(F.col("buckets_base"))
)
# Generate salt per row based mostly on (ProductLine, AttrB) bucket rely
df_demand_price_grid_load = df_demand_price_grid_load.withColumn(
"salt",
(F.rand(seed=42) * F.col("buckets")).solid("int")
)
# Carry out the repartition utilizing the core attributes and the salt column
df_demand_price_grid_load = df_demand_price_grid_load.repartition(
1200, "AttrB", "ProductLine", "salt"
).drop("product_cat_key", "p.c", "brand_count", "buckets_base", "buckets", "salt")
Lastly, we save our dataset to the characteristic desk and add a max variety of rows per partition. That is to forestall Spark from producing partitions with too many rows, which it might probably do even when now we have already computed the salt.
Why can we implement 1M rows? The first focus is on mannequin inference time, not a lot on file dimension. After a number of checks with 1M, 1.5M, 2M, the primary yields the very best efficiency in our case. Once more, very funds and time-constrained for this undertaking, so now we have to take advantage of our assets.
df_demand_price_grid_load.write
.mode("overwrite")
.possibility("replaceWhere", f"RunDate = '{params['RunDate']}'")
.possibility("maxRecordsPerFile", 1_000_000)
.partitionBy("RunDate", "price_delta_cat", "BrandName")
.saveAsTable(f"{params['catalog_revauto']}.{params['schema_revenueautomation']}.demand_features_price_grid")
Why not simply depend on Spark’s Adaptive Question Execution (AQE)?
Recall that the first focus is on inference occasions, not on measurements tuned for normal Spark SQL queries like file dimension. Utilizing solely AQE was really our preliminary try. As you will note within the outcomes, the run occasions had been very undesirable and didn’t maximize the cluster utilization given our knowledge proportions.
Machine Studying inference
There’s a pipeline with 4 duties, one per product. Each activity does the next common steps:
- Masses the options from the corresponding product
- Masses the subset of ML fashions for the corresponding product
- Performs inference in half the subset sliced by
AttrB - Performs inference within the different half sliced by
AttrB - Saves knowledge to the outcomes desk
We’ll deal with one of many inference phases to not overwhelm this text with numbers, though the opposite stage may be very related in construction and outcomes. Furthermore, you’ll be able to see the DAG for the inference to guage in Fig. 2.

It appears very simple, however the run occasions can range relying on how your knowledge is saved and the dimensions of your cluster.
Cluster configuration
For the inference stage we’re analyzing, there may be one cluster per product, tuned for the infrastructure limitations of the undertaking, and likewise the distribution of information:
- Product A: 35 staff (Standard_DS14v2, 420 cores)
- Product B: 5 staff (Standard_DS14v2, 70 cores)
- Product C: 1 employee (Standard_DS14v2, 14 cores)
- Product D: 1 employee (Standard_DS14v2, 14 cores)
As well as, AdaptiveQueryExecution is enabled by default, which is able to let Spark resolve the right way to finest save the information given the context you present.
Outcomes and dialogue
You will notice for every situation an outline of the variety of file partitions per product and the common variety of rows per partition to provide you a sign of what number of rows the ML system will do inference per Spark activity. Moreover, we current Spark UI metrics to look at run-time efficiency and search for the distribution of information at inference time. We’ll do the Spark UI portion just for Product D, which is the most important, to not embody an extra of knowledge. As well as, relying on the situation, inference on Product D turns into a bottleneck in run time. One more reason why it was the first focus of the outcomes.
Non-Salted and Partitioned
You’ll be able to see in Fig. 3that the common file partition has tens of tens of millions of rows, which implies appreciable run time for a single executor. The biggest on common is Product C with greater than 45M rows in a single partition. The smallest is Product B with roughly 12M common rows.

Fig 4. depict the variety of partitions per product, with a complete of 26 for all. Checking product D, 18 partitions fall very wanting the 420 cores now we have obtainable and on common, each partition will carry out inference on ~40M rows.

Check out Fig 5. In whole, the cluster spent 9.9 hours and it nonetheless wasn’t full, as we needed to kill the job, for it was changing into costly and blocking different individuals’s checks.

From the abstract statistics in Fig. 6 for the duties that did end, we will see that there was heavy skew within the partitions for Product D. The utmost enter dimension was ~56M and the runtime was 7.8h.

Non-salted and Liquid
On this situation, we will observe very related outcomes by way of common variety of rows per file partition and variety of partitions per product, as seen in Fig. 7 and Fig. 8, respectively.

Product D has 19 file partitions, nonetheless very wanting 420 cores.

We will already anticipate that this experiment was going to be very costly, so I made a decision to skip the inference check for this situation. Once more, in a perfect state of affairs, we supply it ahead, however there’s a backlog of tickets in my board.
Salty and Partitioned
After making use of the salting and repartition course of, we find yourself with ~2.5M common information per partition for merchandise A and B, and ~1M for merchandise C and D as depicted in Fig 9.

Furthermore, we will see in Fig. 10 that the variety of file partitions elevated to roughly 860 for product D, which provides 430 for every inference stage.

This ends in a run time of 3h for inferencing Product D with 360 duties as seen in Fig 11.

Checking the abstract statistics from Fig. 12, the distribution seems to be balanced with run occasions round 1.7, however a most activity taking 3h, which is value additional investigating sooner or later.

One nice profit is that the salt distributes the information in keeping with the proportions of the merchandise. If we had extra availability of assets, we might improve the variety of shuffle partitions in repartition() and add staff in keeping with the proportions of the information. This ensures that our course of scales predictably.
Salty and Liquid
This situation combines the 2 strongest levers now we have explored up to now:
salting to manage file dimension and parallelism, and liquid clustering to maintain associated knowledge colocated with out inflexible partition boundaries.
After making use of the identical salting technique and a 1M row restrict per partition, the liquid-clustered desk exhibits a really related common partition dimension to the salted and partitioned case, as proven in Fig 13. Merchandise C and D stay near the 1M rows goal, whereas merchandise A and B settle barely above that threshold.

Nevertheless, the principle distinction seems in how these partitions are distributed and consumed by Spark. As proven in Fig. 14, product D once more reaches a excessive variety of file partitions, offering sufficient parallelism to saturate the obtainable cores throughout inference.

Not like the partitioned counterpart, liquid clustering permits Spark to adapt file structure over time whereas nonetheless benefiting from the salt. This ends in a extra even distribution of labor throughout executors, with fewer excessive outliers in each enter dimension and activity period.
From the abstract statistics in Fig. 15, we observe that almost all of duties are accomplished inside a good runtime window, and the utmost activity period is decrease than within the salty and partitioned situation. This means diminished skew and higher load balancing throughout the cluster.


An essential facet impact is that liquid clustering preserves knowledge locality for the filtered columns with out implementing strict partition boundaries. This enables Spark to nonetheless profit from knowledge skipping, whereas the salt ensures that no single executor is overwhelmed with tens of tens of millions of rows.
Total, salty and liquid emerges as essentially the most sturdy setup: it maximizes parallelism, minimizes skew, and reduces operational threat when inference workloads develop or cluster configurations change.
Key Takeaways
- Inference scalability is commonly restricted by knowledge structure, not mannequin complexity. Poorly sized file partitions can depart lots of of cores idle whereas a number of executors course of tens of tens of millions of rows.
- Partitioning alone will not be sufficient for large-scale inference. With out controlling file dimension, partitioned tables can nonetheless produce large partitions that result in long-running, skewed duties.
- Salting is an efficient instrument to unlock parallelism. Introducing a salt key and implementing a row restrict per partition dramatically will increase the variety of runnable duties and stabilizes runtimes.
- Liquid clustering enhances salting by lowering skew with out inflexible boundaries. It permits Spark to adapt file structure over time, making the system extra resilient as knowledge grows.

