

CSVDocumentSplitter divides CSV documents into smaller sub-tables based on empty rows and columns. This is useful for handling structured data that contains multiple tables, improving data processing efficiency and retrieval.

Most common position in a pipelineIn indexing pipelines after Converters , before CSVDocumentCleaner
Mandatory run variables"documents": A list of documents with CSV-formatted content
Output variables"documents": A list of documents, each containing a sub-table extracted from the original CSV file
API referencePreProcessors
GitHub link


CSVDocumentSplitter expects a list of documents containing CSV-formatted content and returns a list of new Document objects, each representing a sub-table extracted from the original document. The splitting process follows these rules:

  1. Row-Based Splitting: If row_split_threshold is set, consecutive empty rows equalling or exceeding this threshold trigger a split.
  2. Column-Based Splitting: If column_split_threshold is set, consecutive empty columns equalling or exceeding this threshold trigger a split.
  3. Recursive Splitting: If both thresholds are provided, CSVDocumentSplitter first splits by rows and then by columns. If more empty rows are detected, the splitting process is called again. This ensures that sub-tables are fully separated.

Each extracted sub-table retains metadata from the original document and includes additional fields:

  • source_id: The ID of the original document
  • row_idx_start: The starting row index of the sub-table in the original document
  • col_idx_start: The starting column index of the sub-table in the original document
  • split_id: The sequential ID of the split within the document

This component is especially useful for document processing pipelines that require structured data to be extracted and stored efficiently.

Supported Document Stores

CSVDocumentSplitter is compatible with the following Document Stores:


On its own

You can use CSVDocumentSplitter outside of a pipeline to process CSV documents directly:

from haystack import Document
from haystack.components.preprocessors import CSVDocumentSplitter

splitter = CSVDocumentSplitter(row_split_threshold=1, column_split_threshold=1)

doc = Document(
split_result =[doc])
print(split_result["documents"])  # List of split tables as Documents

In a pipeline

Here's how you can integrate CSVDocumentSplitter into a Haystack indexing pipeline:

from haystack import Pipeline, Document
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.converters.csv import CSVToDocument
from haystack.components.preprocessors import CSVDocumentSplitter
from haystack.components.preprocessors import CSVDocumentCleaner
from haystack.components.writers import DocumentWriter

# Initialize components
document_store = InMemoryDocumentStore()
p = Pipeline()
p.add_component(instance=CSVToDocument(), name="csv_file_converter")
p.add_component(instance=CSVDocumentSplitter(), name="splitter")
p.add_component(instance=CSVDocumentCleaner(), name="cleaner")
p.add_component(instance=DocumentWriter(document_store=document_store), name="writer")

# Connect components
p.connect("csv_file_converter.documents", "splitter.documents")
p.connect("splitter.documents", "cleaner.documents")
p.connect("cleaner.documents", "writer.documents")

# Run pipeline{"csv_file_converter": {"sources": ["path/to/your/file.csv"]}})

This pipeline extracts CSV content, splits it into structured sub-tables, cleans the CSV documents by removing empty rows and columns, and stores the resulting documents in the Document Store for further retrieval and processing.