Pipeline auto-processing

Overview

Pipeline auto-processing is designed to keep source data and pipeline output in sync. Without this capability, you would have to manually trigger processing or provide external scripts, schedulers, or triggers. Auto-processing includes these features:

  • Full sync: Insert/delete/update are all handled automatically. No lost updates, missing data, or stale records.
  • Change detection: Only new or changed records are processed. No unnecessary reprocessing of known records.
  • Batch processing: Records are grouped into batches to be processed concurrently. Reduces overhead and achieves optimal performance, for example, with GPU-based AI model inference tasks.
  • Background processing: When background processing is enabled, the pipeline runs in a background worker process so that it doesn't block or delay other DB operations. Ideal for processing huge datasets.
  • Live processing for Postgres tables: When the data source is a Postgres table, live trigger-based auto-processing can be enabled so that pipeline results are always up to date.
  • Quick turnaround: Once a batch has finished processing, the results are immediately available. No full listing of the source data is needed to start processing, which is important for large external volumes where a full listing can take a long time.

Example for knowledge base pipeline

Suppose a knowledge base is created for a Postgres table containing products with product descriptions. The user configures background auto-processing to always keep embeddings in sync without blocking or delaying any operations on the products table.

The pipeline processes any preexisting product records in the background. The user can query the statistics table to see the progress.

The background process runs when new data is inserted or existing data is modified or deleted.

Queries on the knowledge base (that is, retrieval operations) always return accurate results after a small background processing delay.

Supported pipelines and modes

Knowledge base pipeline

Source typeDestination typeLiveBackgroundDisabled (manual)
TableTable
VolumeTable

Outputting to volumes isn't supported on knowledge base pipelines. A database with vector capabilities is needed.

Preparer pipeline

Source typeDestination typeLiveBackgroundDisabled (manual)
TableTable
TableVolume
VolumeTable
VolumeVolume

The preparer pipeline doesn't yet support batch processing and background auto-processing.

Auto-processing modes

The following auto-processing modes are available to suit different requirements and use cases.

Live

AIDB sets up Postgres triggers on the source table to immediately process any changes. Processing happens in the trigger function. This means it happens in the same transaction that modifies the data so that results are up to date.

Pros & cons

  • Transactional guarantee/immediate results. Pipeline results are always up to date with the source data.
  • Blocks/delays operations on the source data. Modifying transactions on the source data are delayed until processing is complete.

Background

AIDB starts a Postgres background worker for each pipeline that has background auto-processing configured. Processing happens asynchronously based on a configurable background_sync_interval. See Change detection for details on how the pipelines are processed.

Note

Make sure Postgres allows running enough background workers for the number of pipelines where you want to use this processing mode. Use the Postgres setting max_worker_processes to control this value.

Pros & cons

  • Asynchronous execution means queries on the source don't have to be delayed while the changes are processed.
  • Results are delayed and might become backlogged.
  • Ideal for huge datasets. Processing occurs continuously in the background and isn't tied to any user session or SQL function call.

Disabled

Auto-processing is disabled. You can manually call aidb.bulk_embedding() to process the pipelines.

Note

On table knowledge bases, change detection is also disabled, since it requires active triggers on the source table. This means manual processing (via aidb.bulk_embedding()) has to process all the records in the source.

Observability

You can see detailed status and progress output for all auto-processing modes.

A good place to get an overview is the statistics table. Look up the view aidb.knowledge_base_stats, or use its short alias aidb.kbstat. The view shows all configured knowledge base pipelines, which processing mode is set, and statistics about the processed records:

SELECT * from aidb.kbstat;
Output
     knowledge base     | auto processing | table: unprocessed rows | volume: scans completed | count(source records) | count(embeddings)
------------------------+-----------------+-------------------------+-------------------------+-----------------------+-------------------
 kb_table_text_bg       | Background      |                       0 |                         |                    15 |                15
 kb_table_text_manual   | Disabled        |                       0 |                         |                    15 |                15
 kb_table_image_manual  | Disabled        |                       0 |                         |                     3 |                 3
 kb_table_text_live     | Live            |                       0 |                         |                    15 |                15
 kb_table_image_bg      | Background      |                       0 |                         |                     3 |                 3
 kb_volume_text_bg      | Background      |                         |                       6 |                     7 |                 7
 kb_volume_text_manual  | Disabled        |                         |                       0 |                     0 |                 0
 kb_volume_image_bg     | Background      |                         |                       4 |                   177 |                 6
 kb_volume_image_manual | Disabled        |                         |                       1 |                   177 |                 6
(9 rows)

The change detection mechanism is central to how auto-processing works. It's different for volume and table sources. For this reason, the stats table has different columns for these two source types.

  • table: unprocessed rows: How many unique rows are listed in the backlog of change events.
    • If auto-processing is disabled, no (new) change events are captured.
  • volume: scans completed: How many full listings of the source have been completed so far.
  • count(source records): How many records exist in the source for this pipeline.
    • For table sources, this number is always accurate.
    • For volume sources, this number is updated only after a full scan has completed.
  • count(embeddings): How many embeddings exist in the vector destination table for this pipeline.

Configuration

You can configure auto-processing at creation time:

Tou can also configure it for existing pipelines:

Batch processing

In background and disabled modes, (auto) processing happens in batches of configurable size. The pipeline processes all source records in batches. All records in each batch are processed in parallel wherever possible. This means pipeline steps like data retrieval, embeddings computation, and storing embeddings run as parallel operations. For example, when using a table as a data source, a batch of input records is retrieved with a single query. With a volume source, concurrent requests are used to retrieve a batch of records.

Pipelines knowledge base performance tuning explains how you can tune the batch size for optimal throughput.

Change detection

AIDB auto-processing is designed around change detection mechanisms for table and volume data sources. Change detection allows it to process data only when necessary.

Table sources

When background auto-processing is enabled, Postgres triggers are set up on the source table to detect changes. These triggers are very lightweight. They only record change events and insert them into a "change events" table. No actual processing happens in the trigger function.

The background worker then processes these events asynchronously.

Volume sources

This source type provides a last_modified timestamp for each source record. The system keeps track of those timestamps in a "state" table. In each pipeline execution, the system lists the contents of the volume and compares it to the timestamps to see whether any records have changed or were added.

The system detects deleted objects after a full listing is complete. Only then can it be certain that a previously processed record is no longer present in the source.

Note

Unfortunately, object stores (and other external storage locations supported by our volumes) have limited query capabilities. This means: Change detection for volumes is based on polling, that is, repeated listing. This might be an expensive operation when using cloud object stores like AWS S3. You can use a long background_sync_interval (like one per day) on pipelines with volume sources to control this cost.


Could this page be better? Report a problem or suggest an addition!