Some additional information in one line
✍🏼 About The Author:
Casey Luo, StarRocks Committer & Engineer at Celerdata
 

TL;DR

Under the storage–compute separation (shared-data) architecture, one-time ingestion of massive historical datasets has become an amplified but often overlooked risk. This article explains how StarRocks rethinks the large-scale ingestion path at its source. By redesigning the write pipeline, from memory → local disk spill → centralized merge → object storage, StarRocks minimizes remote writes and redundant overhead, significantly reduces the number of S3 write operations, and improves write throughput by fully utilizing local I/O capacity. This approach addresses the small-file problem at its root, enabling higher efficiency and stability at a lower overall cost.
Note: This optimization is available starting with StarRocks 3.5 and does not apply to earlier versions.
 

Large-Scale Ingestion Becomes an "Amplified Problem" in Shared-Data Architecture

As more users migrate large volumes of historical data into StarRocks, one-time bulk ingestion has become a common operational pattern. On the surface, this looks like a straightforward offline data load. In practice, however, under a shared-data architecture backed by object storage, improper handling can easily trigger a chain reaction: degraded ingestion performance, explosive growth in small files at the storage layer, and ultimately impaired query performance.
As a distributed columnar database, StarRocks adopts an LSM-tree–like storage model. Newly ingested data is first written into in-memory memtables. After sorting and other processing, background threads flush these memtables to persistent storage, and subsequent compaction merges multiple small files into larger, ordered ones. Under normal incremental write workloads, this design balances write efficiency with query performance effectively. But when ingesting massive volumes of historical data in bulk, the same mechanisms can become a bottleneck—and the issues are significantly magnified:
  • Huge data volumes and many tablets. Historical datasets often span a large number of tablets. Each tablet maintains its own memtable, and under high-concurrency ingestion, memtables are flushed frequently, generating a large number of small files in a short time.
  • Limited compute resources during ingestion. In shared-data architecture deployments, users often start with a small number of compute nodes (CNs), sometimes even a single CN, with modest CPU and memory. These constraints further exacerbate the pattern of small memtables, frequent flushes, and rapid accumulation of small files.
  • Early scale-down after ingestion. One advantage of storage–compute separation is the ability to scale down or release compute resources immediately after bulk ingestion, retaining only the data in object storage to reduce costs. However, this also means that the large number of small files generated during ingestion may never be sufficiently compacted, leaving long-term fragmentation in the underlying storage.
  • Query performance degradation later on. When the cluster is scaled back up and queries are run against these historical datasets, the need to scan and process a vast number of small files can significantly degrade query performance.
In short, these issues are more pronounced in storage–compute separation environments because users naturally favor completing large historical ingestions with minimal, lower-spec compute resources. This choice amplifies the small-file problem, which then cascades into long-term performance penalties during query execution.
 

Reworking Bulk Ingestion at the Ingestion Entry Point

To truly address the small-file problem caused by large-scale ingestion, relying on downstream compaction alone is far from sufficient. A closer examination of the entire write pipeline reveals that the root causes are concentrated in several key areas:
  • Memory constraints force premature flushes. On compute nodes (CNs), memtables are often flushed before they are filled due to limited memory, resulting in relatively small files per flush.
  • High-latency remote writes in storage–compute separation. Under a storage–compute separation architecture, every flush writes directly to object storage. The combination of high-latency remote I/O and frequent write operations significantly degrades ingestion throughput.
  • Repeated heavy write work. Each flush triggers the full write pipeline: sorting, encoding, compression, and index construction. Repeating these CPU-intensive steps for small batches wastes substantial compute resources.
  • Redundant work during compaction. The excessive number of small files must later be read again and merged during compaction. Much of the earlier sorting and encoding effort becomes partially redundant, further amplifying resource waste.
Based on this analysis, StarRocks redesigns the bulk ingestion write path for storage–compute separation scenarios, optimizing it at the entry point of the ingestion pipeline.

1. Write Phase: Spill to Local Disk First

When a memtable is full, data is no longer written directly to object storage. Instead, StarRocks spills intermediate data to local disks on the CN. This approach avoids high-latency object storage writes and prevents repeated execution of heavy operations, such as sorting and encoding, before the data has stabilized. If local disk space becomes constrained, intermediate data can be selectively spilled to object storage (for example, S3) to ensure overall system robustness.

2. Consolidation Phase: Merge First, Then Write to Object Storage

Once the bulk ingestion task completes, StarRocks performs a centralized merge on the temporary spill files. These files are consolidated into well-structured, appropriately sized data files, which are then written to object storage in a single, efficient step.
In summary, the redesigned bulk ingestion pipeline can be described as:
Memory → Local Disk Spill → Centralized Merge → Object Storage
By restructuring the write path in this way, StarRocks tackles the small-file problem at its origin, delivering higher ingestion throughput, better resource utilization, and more stable performance in shared-data architecture deployments.
 
download_image (13)
 
This optimized bulk ingestion path delivers clear benefits across three key dimensions:
  1. Higher write throughput during ingestion. When a memtable fills up, StarRocks spills intermediate results only to local disk instead of writing directly to backend object storage. By avoiding high-latency remote writes at this stage, the system significantly improves ingestion performance.
  2. Lower CPU and resource overhead. During the spill phase, the system simply persists memtable data to disk without triggering the full write pipeline, such as global sorting, encoding, or index construction. Skipping these expensive operations at intermediate stages reduces unnecessary CPU and memory consumption.
  3. Fewer, larger files with more predictable query performance. All temporary spill files are merged in a single, centralized step before being written to object storage, producing a much smaller number of well-sized data files. This dramatically reduces small-file proliferation and largely eliminates reliance on background compaction. As a result, even if queries are issued immediately after ingestion completes, they can run with stable and predictable performance.

Performance Comparison

To evaluate the real-world impact of the bulk ingestion optimizations described above, we designed two comparative tests on a shared-data architecture cluster:
  • Single-concurrency scenario: A single ingestion job loading 1 TB of data, comparing ingestion time and post-ingestion query performance before and after the optimization.
  • High-concurrency scenario: 10 concurrent ingestion jobs, each loading 100 GB (still 1 TB in total), comparing ingestion throughput and query performance after ingestion, before and after the optimization.

Test 1: Single-Concurrency Bulk Ingestion

In this test, we used Broker Load to ingest a 1 TB dataset (approximately 270 million rows) in a single concurrent job.
Before the optimization:
  • The ingestion phase itself took approximately 2 hours and 15 minutes.
  • After ingestion was completed, the system spent an additional 34 minutes performing background compaction.
From a user’s perspective, the total time from submitting the ingestion job to the system returning to a stable, query-ready state was approximately 2 hours and 50 minutes.
*************************** 3. row ***************************
JobId: 10409
State: FINISHED
Type: BROKER
SinkRows: 270000000
LoadStartTime: 2024-12-27 10:59:12
LoadFinishTime: 2024-12-27 13:14:04
After the ingestion was completed, the compaction score for the partition was:
AvgCS: 358.06 P50CS: 299.00 MaxCS: 1056.00
When the ingestion finished, the following query was executed immediately:
mysql> select count(*) from duplicate_21_0;
+-----------+
| count(*) |
+-----------+
| 270000000 |
+-----------+
1 row in set (56.25 sec)
After the optimization, the total ingestion time was approximately 2 hours and 42 minutes.
*************************** 2. row ***************************
JobId: 10642
State: FINISHED
Type: BROKER
SinkRows: 270000000
LoadStartTime: 2024-12-27 16:14:08
LoadFinishTime: 2024-12-27 18:56:00
After ingestion was completed, the compaction score was already at the optimal level, requiring no background compaction.
AvgCS: 2.39 P50CS: 2.00 MaxCS: 5.00
Immediately after the ingestion was completed, the query was executed:
mysql> select count(*) from duplicate_21_0;
+-----------+
| count(*) |
+-----------+
| 270000000 |
+-----------+
1 row in set (0.72 sec)

Test 2: High-Concurrency Bulk Ingestion Stress Test

In this test, we ran a high-concurrency ingestion workload on a total dataset of 1 TB. The target table contained 28 partitions, with 256 tablets per partition.
Before the optimization, ingestion was constrained by the CPU and memory resources of the compute nodes. As a result, the workload failed to complete within the 4-hour timeout window and was eventually automatically canceled by the system. The final job state is shown below:
*************************** 10. row ***************************
JobId: 11458
State: CANCELLED
Type: BROKER
Priority: NORMAL
ScanRows: 21905408
LoadStartTime: 2025-01-06 17:11:46
LoadFinishTime: 2025-01-06 21:11:44
After the optimization:
*************************** 20. row ***************************
JobId: 28336
State: FINISHED
Type: BROKER
Priority: NORMAL
ScanRows: 30000000
LoadStartTime: 2025-01-06 20:10:49
LoadFinishTime: 2025-01-06 20:27:59
Under the same conditions, the 10 concurrent ingestion jobs started at 2025-01-06 20:10:49 and all completed by 2025-01-06 20:36:10, for a total runtime of approximately 25 minutes.
Although these jobs did trigger the compaction threshold, the system’s compaction score remained within a healthy and stable range throughout the ingestion and at completion.
vgCS: 10.00 P50CS: 10.00 MaxCS: 10.00
In addition, we can compare several key backend object storage metrics before and after the optimization:
 
download_image (14)
download_image (15)
(Key S3 Metrics Before Optimization)
download_image (16)download_image (17)
(Key S3 Metrics After Optimization)
 
download_image (18)
(Comparison of Local Disk I/O Utilization Before and After Optimization)
 
The results clearly show that after this optimization is enabled:
  1. Significantly fewer writes to S3, with much higher throughput. The number of write operations to S3 is drastically reduced, while overall write throughput increases substantially. At the same time, the average object size grows significantly, helping lower storage costs and improving both read and write efficiency.
  2. More effective utilization of local disk I/O during ingestion. The ingestion process is able to fully leverage local disk I/O capacity, leading to a noticeable improvement in overall ingestion performance.
 

Summary

By optimizing bulk data ingestion at the engine level, StarRocks effectively avoids the proliferation of small files that commonly occurs under resource constraints, especially limited memory, during historical data backfill. This enables users in shared-data architecture environments to achieve higher efficiency and more stable performance with lower infrastructure investment.
 

Want to dive deeper into technical details or ask questions? Join StarRocks Slack to continue the conversation!