Skip to main content
This example walks through a complete Geneva workflow: creating a raw table in S3, adding computed columns via a backfill on a KubeRay cluster, and materializing a view with embeddings for downstream search. The dataset is a product catalog with titles and descriptions. We’ll compute a word_count feature column, then create a materialized view that adds text embeddings.

0. What you need to run this

Before running this example, you’ll need:
  • An S3 bucket (or other cloud object storage) that Geneva can read and write to
  • A Kubernetes cluster with the KubeRay operator installed and Geneva deployed
  • IAM credentials granting your Geneva client access to the bucket and Kubernetes API — see Manual Deployment for the required permissions
Fill in these constants and the rest of the code will run as-is:
CLOUD_OBJECT_STORAGE_LOCATION = "s3://my-bucket/product-catalog" # your S3/GCS/ABS path
K8S_NAMESPACE = "geneva" # namespace where Geneva is deployed

1. Connect and create a table

import pyarrow as pa
import geneva

db = geneva.connect(CLOUD_OBJECT_STORAGE_LOCATION)

# Create a raw product table
schema = pa.schema([
    pa.field("product_id", pa.int64()),
    pa.field("title", pa.string()),
    pa.field("description", pa.string()),
    pa.field("category", pa.string()),
    pa.field("price", pa.float64()),
])

data = pa.table({
    "product_id": [1, 2, 3, 4, 5],
    "title": ["Chainmail Coif", "Jousting Lance Grip Tape", "Dragon-Repellent Spray", "Sword Squeegee", "Visor Windshield Wipers"],
    "description": [
        "Premium riveted chainmail head covering. Breathable enough for dragon fire, probably.",
        "Non-slip grip tape for jousting lances. 3000 PSI tensile strength. Void where tilting is prohibited.",
        "All-natural herbal spray. Dragons hate it. Effectiveness not guaranteed against actual dragons.",
        "Ergonomic squeegee fits all standard broadswords. Removes blood, mud, and existential dread.",
        "Hand-cranked windshield wipers for full-face visors. Never ride blind into battle again.",
    ],
    "category": ["armor", "tournament", "defense", "maintenance", "armor"],
    "price": [129.99, 45.00, 59.99, 34.99, 24.99],
})

try:
    db.drop_table("products_raw")
except Exception:
    pass
table = db.create_table("products_raw", data=data, schema=schema)

2. Define UDFs

import pyarrow as pa
from geneva import udf

@udf(data_type=pa.int32())
def word_count(description: str) -> int:
    """Count words in the description."""
    return len(description.split())

@udf(data_type=pa.string())
def price_tier(price: float) -> str:
    """Bucket price into tiers."""
    if price < 30:
        return "budget"
    elif price < 75:
        return "mid-range"
    else:
        return "premium"

3. Register columns and run a backfill on KubeRay

from geneva.cluster.builder import KubeRayClusterBuilder
from geneva.manifest.builder import PipManifestBuilder
import sys
import ray
from importlib.metadata import version
from geneva.utils.ray import get_ray_image

# Define the Python environment for workers
manifest = (
    PipManifestBuilder.create("product-catalog-manifest")
    .pip([
        "sentence-transformers==3.3.1",
        "torch==2.5.1",
        f"geneva[udf-text-sentence-transformers]=={version('geneva')}",
    ])
    .build()
)
db.define_manifest("product-catalog-manifest", manifest)

# Make sure to use an image that:
# 1. matches your local ray and python versions
# 2. matches the architecture of the k8s nodes. These are in AWS and are
#    x64 nodes, so we set arm=False.
# get_ray_image() helps you get the right image.
ray_image = get_ray_image(
    ray.__version__,
    f"{sys.version_info.major}.{sys.version_info.minor}",
    arm=False,
)

# Define the KubeRay cluster
cluster = (
    KubeRayClusterBuilder.create("product-catalog-cluster")
    .namespace(K8S_NAMESPACE)
    .head_group(cpus=4, memory="8Gi", image=ray_image)
    .add_worker_group(
        KubeRayClusterBuilder.cpu_worker()
        .cpus(4)
        .memory("16Gi")
        .min_replicas(1)
        .max_replicas(3)
        .image(ray_image)
        .build()
    )
    .build()
)
db.define_cluster("product-catalog-cluster", cluster)

# Add computed columns
table.add_columns({
    "word_count": word_count,
    "price_tier": price_tier,
})

# Run the backfill on the KubeRay cluster
with db.context(cluster="product-catalog-cluster", manifest="product-catalog-manifest"):
    table.backfill("word_count")
    table.backfill("price_tier")

4. Create a materialized view with embeddings

The materialized view selects a subset of columns from the source table — here we drop price and price_tier, keeping only what’s needed for search. After creating a materialized view, we will add and backfill a new column of embeddings.
from geneva.udfs import sentence_transformer_udf

# Define an embedding UDF using a template from LanceDB's UDF library
embed = sentence_transformer_udf(
    column="description",
    model="BAAI/bge-small-en-v1.5",
    normalize=True,
    dimension=384,  # BAAI/bge-small-en-v1.5 output dimension; avoids loading model locally
)

# Select only the columns needed for search — price and price_tier are excluded
query = (
    table.search()
    .select(["product_id", "title", "description", "category", "word_count"])
)

# Create the materialized view with an embedding column derived from description
db.create_materialized_view(
    "products_enriched",
    query,
    columns={"embedding": embed},
)

# Populate the materialized view
enriched = db.open_table("products_enriched")
with db.context(cluster="product-catalog-cluster", manifest="product-catalog-manifest"):
    enriched.refresh()
    enriched.add_columns({"embedding": embed})
    enriched.backfill("embedding") # refresh (above) adds rows but doesn't compute UDF columns

5. Query the enriched table

import pyarrow.compute as pc

enriched = db.open_table("products_enriched")

# Vector search
results = (
    enriched.search([0.1] * 384, vector_column_name="embedding")
    .limit(3)
    .to_arrow()
)

# Filtered search — armor category only
armor_results = (
    enriched.search([0.1] * 384, vector_column_name="embedding")
    .where("category = 'armor'")
    .limit(3)
    .to_arrow()
)

6. Incremental refresh

As new products are added to the source table, refresh the view to compute embeddings for the new rows only:
# Append new products
new_data = pa.table({
    "product_id": [6, 7],
    "title": ["Moat Floaties", "Knight-Night Sleep Mask"],
    "description": [
        "Inflatable arm floaties in battleship grey. Because even knights can't always swim in full plate.",
        "Padded silk sleep mask embroidered with your coat of arms. Blocks out 100% of torchlight.",
    ],
    "category": ["defense", "armor"],
    "price": [29.99, 19.99],
})
table.add(new_data)

with db.context(cluster="product-catalog-cluster", manifest="product-catalog-manifest"):
    table.backfill("word_count") # only fills null values
    table.backfill("price_tier")
    enriched.refresh()
    enriched.backfill("embedding") # refresh adds new rows but doesn't compute UDF columns