Skip to main content
A scanned batch or a shared inbox rarely holds one clean document. It holds an invoice, a receipt, and a purchase order merged into a single PDF, in no particular order. You can’t run one extraction schema over the whole thing, because the fields you want from an invoice aren’t the fields you want from a receipt. This recipe handles that in two stages. First we split the pile into one file per document type, which also tells us what each file is. Then we run a different extraction schema over each file, chosen by its type. The result is a tidy object keyed by document type, with the right fields pulled from each.
This recipe chains two endpoints covered on their own in the Splitting quickstart and the Extract quickstart. Read those first if you want the detail on either call in isolation.

What We’ll Build

A script that:
  1. Sends a multi-document PDF to /splitter along with the document types we expect to find.
  2. Gets back one PDF per type, each labelled with its category and hosted at a URL.
  3. Picks a matching extraction schema for each file and sends it to /v2/extract by URL.
  4. Collects the extracted fields into a single object, keyed by document type.
We’ll run it against a three-page sample that holds an invoice, a receipt, and a purchase order. Grab the full script from the dropdown below if you’d rather skip the walkthrough.
Set UNSILOED_API_KEY in your environment and save the document pile as pile.pdf in the same directory before running.
sort_and_extract.py
import json
import os
import time
import requests

API_KEY = os.environ["UNSILOED_API_KEY"]
BASE_URL = "https://prod.visionapi.unsiloed.ai"

# The document types we expect in the pile. The description guides the split.
CATEGORIES = [
    {"name": "Invoice", "description": "A bill issued by a seller requesting payment for goods or services"},
    {"name": "Receipt", "description": "Proof of a completed purchase from a store or point of sale"},
    {"name": "Purchase Order", "description": "A buyer-issued document authorizing a purchase from a vendor"},
]

# One extraction schema per type. The keys match the category names above.
SCHEMAS = {
    "Invoice": {
        "type": "object",
        "properties": {
            "vendor_name": {"type": "string", "description": "Company issuing the invoice (the seller)"},
            "invoice_number": {"type": "string", "description": "Invoice identifier"},
            "total_due": {"type": "number", "description": "Total amount due in US dollars"},
        },
        "required": ["vendor_name", "invoice_number", "total_due"],
        "additionalProperties": False,
    },
    "Receipt": {
        "type": "object",
        "properties": {
            "store_name": {"type": "string", "description": "Name of the store"},
            "transaction_id": {"type": "string", "description": "Transaction or receipt ID"},
            "total": {"type": "number", "description": "Total amount paid in US dollars"},
        },
        "required": ["store_name", "transaction_id", "total"],
        "additionalProperties": False,
    },
    "Purchase Order": {
        "type": "object",
        "properties": {
            "po_number": {"type": "string", "description": "Purchase order number"},
            "vendor_name": {"type": "string", "description": "Vendor the order is placed with"},
            "required_by": {"type": "string", "description": "Date the order is required by"},
        },
        "required": ["po_number", "vendor_name", "required_by"],
        "additionalProperties": False,
    },
}

def wait_for(url):
    """Poll a job URL until it finishes, then return the completed response."""
    for _ in range(90):  # roughly 6 minutes at 4 seconds per poll
        job = requests.get(url, headers={"api-key": API_KEY}).json()
        if job.get("status") in ("completed", "review"):
            return job
        if job.get("status") == "failed":
            raise RuntimeError(job.get("error", "job failed"))
        time.sleep(4)
    raise TimeoutError(f"Job at {url} did not finish in time")

# Stage 1: split the pile into one file per document type.
with open("pile.pdf", "rb") as f:
    resp = requests.post(
        f"{BASE_URL}/splitter",
        headers={"api-key": API_KEY},
        files={"file": ("pile.pdf", f, "application/pdf")},
        data={"categories": json.dumps(CATEGORIES)},
    )
resp.raise_for_status()
job = resp.json()
sections = wait_for(f"{BASE_URL}/splitter/{job['job_id']}")["result"]["files"]
print(f"Split into {len(sections)} documents: {[s['name'] for s in sections]}")

# Stage 2: extract type-specific fields from each section, by URL.
extract_jobs = {}
for section in sections:
    category = os.path.splitext(section["name"])[0]
    schema = SCHEMAS.get(category)
    if schema is None:
        print(f"No schema for '{category}', skipping.")
        continue
    resp = requests.post(
        f"{BASE_URL}/v2/extract",
        headers={"api-key": API_KEY},
        data={
            "file_url": section["full_path"],
            "schema_data": json.dumps(schema),
            "model": "gamma",
        },
    )
    resp.raise_for_status()
    extract_jobs[category] = resp.json()["job_id"]

results = {}
for category, job_id in extract_jobs.items():
    done = wait_for(f"{BASE_URL}/extract/{job_id}")
    results[category] = {field: cell["value"] for field, cell in done["result"].items()}

print(json.dumps(results, indent=2))

Step 1: Set Up Your Environment

Before writing any code, we need three things: an API key, a document pile, and the runtime for our chosen language.

1.1 Get an Unsiloed AI API Key

To get API access, sign up on Unsiloed AI. Export your key as an environment variable so it stays out of source control:
export UNSILOED_API_KEY="your-api-key"

1.2 Pick a Document Pile

The walkthrough assumes a PDF saved as pile.pdf in your working directory. To follow along with the exact output shown below, download our sample document pile: a three-page PDF that holds an invoice, a store receipt, and a purchase order, one per page. The three documents are unrelated and in mixed order, which is exactly the case this recipe is built for.

1.3 Install Dependencies

You need Python 3.8 or newer. Install the requests package:
pip install requests

Step 2: Split the Pile by Document Type

The splitter takes the whole PDF plus a list of the document types we expect, and returns one new PDF per type. Each returned file is labelled with the category it matched and hosted at a URL we can hand straight to the extractor in the next step, so there’s no need to save anything to disk in between.

2.1 Describe the Document Types

A category is a name and a description. The description does the work: it tells the splitter how to recognize each type, so the clearer it is, the more reliable the split. We’re looking for three types.
Create a file called sort_and_extract.py with the configuration and categories:
sort_and_extract.py
import json
import os
import time
import requests

API_KEY = os.environ["UNSILOED_API_KEY"]
BASE_URL = "https://prod.visionapi.unsiloed.ai"

CATEGORIES = [
    {"name": "Invoice", "description": "A bill issued by a seller requesting payment for goods or services"},
    {"name": "Receipt", "description": "Proof of a completed purchase from a store or point of sale"},
    {"name": "Purchase Order", "description": "A buyer-issued document authorizing a purchase from a vendor"},
]

2.2 Submit the Pile and Wait for the Split

Both endpoints in this recipe run asynchronously: we submit a job, get a job_id, and poll until it’s done. Since we do that twice, it’s worth a small helper. Then we post the pile to /splitter with the categories as a JSON string under the categories field.
Add the polling helper and the split call:
sort_and_extract.py
def wait_for(url):
    for _ in range(90):  # roughly 6 minutes at 4 seconds per poll
        job = requests.get(url, headers={"api-key": API_KEY}).json()
        if job.get("status") in ("completed", "review"):
            return job
        if job.get("status") == "failed":
            raise RuntimeError(job.get("error", "job failed"))
        time.sleep(4)
    raise TimeoutError(f"Job at {url} did not finish in time")

with open("pile.pdf", "rb") as f:
    resp = requests.post(
        f"{BASE_URL}/splitter",
        headers={"api-key": API_KEY},
        files={"file": ("pile.pdf", f, "application/pdf")},
        data={"categories": json.dumps(CATEGORIES)},
    )
resp.raise_for_status()
job = resp.json()
sections = wait_for(f"{BASE_URL}/splitter/{job['job_id']}")["result"]["files"]
print(f"Split into {len(sections)} documents: {[s['name'] for s in sections]}")
Note the form field is file here, not pdf_file. The splitter and the extractor use different field names.
Each entry in sections describes one split-out document. The two fields we use are name (the category it matched, with a .pdf suffix, such as Invoice.pdf) and full_path (a URL to the new single-document PDF). Those URLs are signed S3 links that expire roughly an hour after the response, so we use them right away in the next step rather than storing them. If a link does lapse, re-issue GET /splitter/{job_id} to get fresh ones.

Step 3: Extract the Right Fields From Each Document

Now we loop over the split files. For each one, we look up the schema that matches its type and send the file’s URL to /v2/extract. Passing file_url means the extractor pulls the document straight from the split result, so we never download and re-upload it.

3.1 Define a Schema per Type

Each document type wants different fields. We keep one schema per category in a dictionary, keyed by the same names we gave the splitter, so we can look up the right schema by the file’s category.
Add the schema map below the categories:
sort_and_extract.py
SCHEMAS = {
    "Invoice": {
        "type": "object",
        "properties": {
            "vendor_name": {"type": "string", "description": "Company issuing the invoice (the seller)"},
            "invoice_number": {"type": "string", "description": "Invoice identifier"},
            "total_due": {"type": "number", "description": "Total amount due in US dollars"},
        },
        "required": ["vendor_name", "invoice_number", "total_due"],
        "additionalProperties": False,
    },
    "Receipt": {
        "type": "object",
        "properties": {
            "store_name": {"type": "string", "description": "Name of the store"},
            "transaction_id": {"type": "string", "description": "Transaction or receipt ID"},
            "total": {"type": "number", "description": "Total amount paid in US dollars"},
        },
        "required": ["store_name", "transaction_id", "total"],
        "additionalProperties": False,
    },
    "Purchase Order": {
        "type": "object",
        "properties": {
            "po_number": {"type": "string", "description": "Purchase order number"},
            "vendor_name": {"type": "string", "description": "Vendor the order is placed with"},
            "required_by": {"type": "string", "description": "Date the order is required by"},
        },
        "required": ["po_number", "vendor_name", "required_by"],
        "additionalProperties": False,
    },
}

3.2 Extract Each Section by URL

Loop over the split files, look up the schema for each category, and submit an extract job for each one. We strip the .pdf suffix from the file name to get the category key, and skip any type we don’t have a schema for. Submitting every job before polling any of them matters here: the extractor fetches each file_url at submission time, so the signed links get consumed while they’re fresh instead of waiting behind another job’s polling loop, and the jobs run in parallel on the server rather than one at a time.
Add the extraction loop:
sort_and_extract.py
extract_jobs = {}
for section in sections:
    category = os.path.splitext(section["name"])[0]
    schema = SCHEMAS.get(category)
    if schema is None:
        print(f"No schema for '{category}', skipping.")
        continue
    resp = requests.post(
        f"{BASE_URL}/v2/extract",
        headers={"api-key": API_KEY},
        data={
            "file_url": section["full_path"],
            "schema_data": json.dumps(schema),
            "model": "gamma",
        },
    )
    resp.raise_for_status()
    extract_jobs[category] = resp.json()["job_id"]

results = {}
for category, job_id in extract_jobs.items():
    done = wait_for(f"{BASE_URL}/extract/{job_id}")
    results[category] = {field: cell["value"] for field, cell in done["result"].items()}
Because we extract by URL, there’s no file upload here, so everything goes in the data form fields instead of files.

Step 4: Print the Combined Result

The results object now holds the extracted fields for every document in the pile, keyed by type. Print it as formatted JSON.
Add the final line and run the script:
sort_and_extract.py
print(json.dumps(results, indent=2))
python sort_and_extract.py

Sample Output

Run against the sample pile, the script splits the three pages into three labelled documents, then extracts the fields each type calls for:
Split into 3 documents: ['Invoice.pdf', 'Receipt.pdf', 'Purchase Order.pdf']
{
  "Invoice": {
    "vendor_name": "Greenfield Print & Bindery",
    "invoice_number": "GPB-20418",
    "total_due": 2150.0
  },
  "Receipt": {
    "store_name": "Cooper's Office Supply",
    "transaction_id": "7741-208-4490",
    "total": 87.61
  },
  "Purchase Order": {
    "po_number": "PO-2026-0813",
    "vendor_name": "Westbrook Furniture Co.",
    "required_by": "June 5, 2026"
  }
}
One PDF went in; three correctly typed records came out, each carrying only the fields that make sense for its document type. A receipt never gets asked for an invoice number, and the purchase order’s vendor is pulled from the right place on the page.

Where to Take This Next

The pile in this recipe had one page per document, but the splitter handles multi-page documents too, grouping consecutive pages that belong together. To process a folder of mixed PDFs, wrap the whole script in a loop over your files and merge each run’s results into one collection.

Splitting

How the splitter decides document boundaries, and the full response reference.

Classification

Label a single document by type without splitting it.

Extraction Schemas

Build richer per-type schemas, including nested objects and line-item tables.

API Reference

Browse the full request and response specs for /splitter and /v2/extract.