Skip to content

Batch Processor

Batch processing utilities for adding multiple papers to vector store.

_batch_embed(chunks, ids, store, batch_size, has_gpu)

Embed chunks in batches and verify insertion exactly as before.

Source code in aiagents4pharma/talk2scholars/tools/pdf/utils/batch_processor.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def _batch_embed(
    chunks: List[Document],
    ids: List[str],
    store: Any,
    batch_size: int,
    has_gpu: bool,
) -> None:
    """Embed chunks in batches and verify insertion exactly as before."""
    start = time.time()
    n = len(chunks)
    logger.info(
        "Starting BATCH EMBEDDING of %d chunks in batches of %d (%s)",
        n,
        batch_size,
        "GPU" if has_gpu else "CPU",
    )

    for batch_num, start_idx in enumerate(range(0, n, batch_size), start=1):
        end_idx = min(start_idx + batch_size, n)
        logger.info(
            "Embedding batch %d/%d (chunks %d-%d of %d) - %s",
            batch_num,
            (n + batch_size - 1) // batch_size,
            start_idx + 1,
            end_idx,
            n,
            "GPU" if has_gpu else "CPU",
        )

        store.add_documents(
            documents=chunks[start_idx:end_idx],
            ids=ids[start_idx:end_idx],
        )

        # Post-insert verification
        col = store.col
        col.flush()
        count = col.num_entities
        logger.info(
            "Post-insert batch %d: collection has %d entities",
            batch_num,
            count,
        )
        if count:
            logger.info(
                "Sample paper IDs: %s",
                [
                    r.get("paper_id", "unknown")
                    for r in col.query(expr="", output_fields=["paper_id"], limit=3)
                ],
            )

        logger.info("Successfully stored batch %d", batch_num)

    elapsed = time.time() - start
    logger.info(
        "BATCH EMBEDDING COMPLETE: %d chunks in %.2f seconds (%.2f chunks/sec)",
        n,
        elapsed,
        n / elapsed if elapsed > 0 else 0,
    )

_parallel_load_and_split(papers, config, metadata_fields, documents, max_workers)

Load & split PDFs in parallel, preserving original logic.

Source code in aiagents4pharma/talk2scholars/tools/pdf/utils/batch_processor.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def _parallel_load_and_split(
    papers: List[Tuple[str, str, Dict[str, Any]]],
    config: Any,
    metadata_fields: List[str],
    documents: Dict[str, Document],
    max_workers: int,
) -> Tuple[List[Document], List[str], List[str]]:
    """Load & split PDFs in parallel, preserving original logic."""
    all_chunks: List[Document] = []
    all_ids: List[str] = []
    success: List[str] = []

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(
                load_and_split_pdf,
                pid,
                url,
                md,
                config,
                metadata_fields=metadata_fields,
                documents_dict=documents,
            ): pid
            for pid, url, md in papers
        }
        logger.info("Submitted %d PDF loading tasks", len(futures))

        for idx, fut in enumerate(concurrent.futures.as_completed(futures), start=1):
            pid = futures[fut]
            chunks = fut.result()
            ids = [f"{pid}_{i}" for i in range(len(chunks))]

            all_chunks.extend(chunks)
            all_ids.extend(ids)
            success.append(pid)

            logger.info(
                "Progress: %d/%d - Loaded paper %s (%d chunks)",
                idx,
                len(papers),
                pid,
                len(chunks),
            )

    return all_chunks, all_ids, success

add_papers_batch(papers_to_add, vector_store, loaded_papers, paper_metadata, documents, **kwargs)

Add multiple papers to the document store in parallel with batch embedding.

Parameters:

Name Type Description Default
papers_to_add List[Tuple[str, str, Dict[str, Any]]]

List of tuples (paper_id, pdf_url, paper_metadata).

required
vector_store Any

The LangChain Milvus vector store instance.

required
loaded_papers Set[str]

Set to track which papers are already loaded.

required
paper_metadata Dict[str, Dict[str, Any]]

Dict to store paper metadata after load.

required
documents Dict[str, Document]

Dict to store document chunks.

required
config

(via kwargs) Configuration object.

required
metadata_fields

(via kwargs) List of metadata fields to include.

required
has_gpu

(via kwargs) Whether GPU is available.

required
max_workers

(via kwargs) Max PDF‐loading threads (default 5).

required
batch_size

(via kwargs) Embedding batch size (default 100).

required
Source code in aiagents4pharma/talk2scholars/tools/pdf/utils/batch_processor.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def add_papers_batch(
    papers_to_add: List[Tuple[str, str, Dict[str, Any]]],
    vector_store: Any,
    loaded_papers: Set[str],
    paper_metadata: Dict[str, Dict[str, Any]],
    documents: Dict[str, Document],
    **kwargs: Any,
) -> None:
    """
    Add multiple papers to the document store in parallel with batch embedding.

    Args:
        papers_to_add: List of tuples (paper_id, pdf_url, paper_metadata).
        vector_store: The LangChain Milvus vector store instance.
        loaded_papers: Set to track which papers are already loaded.
        paper_metadata: Dict to store paper metadata after load.
        documents: Dict to store document chunks.
        config:           (via kwargs) Configuration object.
        metadata_fields:  (via kwargs) List of metadata fields to include.
        has_gpu:          (via kwargs) Whether GPU is available.
        max_workers:      (via kwargs) Max PDF‐loading threads (default 5).
        batch_size:       (via kwargs) Embedding batch size (default 100).
    """
    cfg = kwargs

    if not papers_to_add:
        logger.info("No papers to add")
        return

    to_process = [
        (pid, url, md) for pid, url, md in papers_to_add if pid not in loaded_papers
    ]
    if not to_process:
        logger.info("Skipping %d already-loaded papers", len(papers_to_add))
        logger.info("All %d papers are already loaded", len(papers_to_add))
        return

    logger.info(
        "Starting PARALLEL batch processing of %d papers with %d workers (%s)",
        len(to_process),
        cfg.get("max_workers", 5),
        "GPU acceleration" if cfg["has_gpu"] else "CPU processing",
    )

    chunks, ids, success = _parallel_load_and_split(
        to_process,
        cfg["config"],
        cfg["metadata_fields"],
        documents,
        cfg.get("max_workers", 5),
    )

    if not chunks:
        logger.warning("No chunks to add to vector store")
        return

    for pid, _, md in to_process:
        if pid in success:
            paper_metadata[pid] = md

    try:
        _batch_embed(
            chunks,
            ids,
            vector_store,
            cfg.get("batch_size", 100),
            cfg["has_gpu"],
        )
    except Exception:
        logger.error("Failed to add chunks to Milvus", exc_info=True)
        raise

    # finally mark papers as loaded
    loaded_papers.update(success)