Skip to content

Zotero Read Helper

Utility for zotero read tool.

ZoteroSearchData

Helper class to organize Zotero search-related data.

Source code in aiagents4pharma/talk2scholars/tools/zotero/utils/read_helper.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
class ZoteroSearchData:
    """Helper class to organize Zotero search-related data."""

    def __init__(
        self,
        query: str,
        only_articles: bool,
        limit: int,
        download_pdfs: bool = True,
        **_kwargs,
    ):
        self.query = query
        self.only_articles = only_articles
        self.limit = limit
        # Control whether to fetch PDF attachments now
        self.download_pdfs = download_pdfs
        self.cfg = self._load_config()
        self.zot = self._init_zotero_client()
        self.item_to_collections = get_item_collections(self.zot)
        self.article_data = {}
        self.content = ""
        # Create a session for connection pooling
        self.session = requests.Session()

    def process_search(self) -> None:
        """Process the search request and prepare results."""
        items = self._fetch_items()
        self._filter_and_format_papers(items)
        self._create_content()

    def get_search_results(self) -> Dict[str, Any]:
        """Get the search results and content."""
        return {
            "article_data": self.article_data,
            "content": self.content,
        }

    def _load_config(self) -> Any:
        """Load hydra configuration."""
        with hydra.initialize(version_base=None, config_path="../../../configs"):
            cfg = hydra.compose(
                config_name="config", overrides=["tools/zotero_read=default"]
            )
            logger.info("Loaded configuration for Zotero search tool")
            return cfg.tools.zotero_read

    def _init_zotero_client(self) -> zotero.Zotero:
        """Initialize Zotero client."""
        logger.info(
            "Searching Zotero for query: '%s' (only_articles: %s, limit: %d)",
            self.query,
            self.only_articles,
            self.limit,
        )
        return zotero.Zotero(self.cfg.user_id, self.cfg.library_type, self.cfg.api_key)

    def _fetch_items(self) -> List[Dict[str, Any]]:
        """Fetch items from Zotero."""
        try:
            if self.query.strip() == "":
                logger.info(
                    "Empty query provided, fetching all items up to max_limit: %d",
                    self.cfg.zotero.max_limit,
                )
                items = self.zot.items(limit=self.cfg.zotero.max_limit)
            else:
                items = self.zot.items(
                    q=self.query, limit=min(self.limit, self.cfg.zotero.max_limit)
                )
        except Exception as e:
            logger.error("Failed to fetch items from Zotero: %s", e)
            raise RuntimeError(
                "Failed to fetch items from Zotero. Please retry the same query."
            ) from e

        logger.info("Received %d items from Zotero", len(items))

        if not items:
            logger.error("No items returned from Zotero for query: '%s'", self.query)
            raise RuntimeError(
                "No items returned from Zotero. Please retry the same query."
            )

        return items

    def _collect_item_attachments(self) -> Dict[str, str]:
        """Collect PDF attachment keys for non-orphan items."""
        item_attachments: Dict[str, str] = {}
        for item_key, item_data in self.article_data.items():
            if item_data.get("Type") == "orphan_attachment":
                continue
            try:
                children = self.zot.children(item_key)
                for child in children:
                    data = child.get("data", {})
                    if data.get("contentType") == "application/pdf":
                        attachment_key = data.get("key")
                        filename = data.get("filename", "unknown.pdf")
                        if attachment_key:
                            item_attachments[attachment_key] = item_key
                            self.article_data[item_key]["filename"] = filename
                            break
            except Exception as e:
                logger.error("Failed to get attachments for item %s: %s", item_key, e)
        return item_attachments

    def _process_orphaned_pdfs(self, orphaned_pdfs: Dict[str, str]) -> None:
        """Download or record orphaned PDF attachments."""
        if self.download_pdfs:
            logger.info("Downloading %d orphaned PDFs in parallel", len(orphaned_pdfs))
            results = download_pdfs_in_parallel(
                self.session,
                self.cfg.user_id,
                self.cfg.api_key,
                orphaned_pdfs,
                chunk_size=getattr(self.cfg, "chunk_size", None),
            )
            for item_key, (file_path, filename, attachment_key) in results.items():
                self.article_data[item_key]["filename"] = filename
                self.article_data[item_key]["pdf_url"] = file_path
                self.article_data[item_key]["attachment_key"] = attachment_key
                logger.info("Downloaded orphaned Zotero PDF to: %s", file_path)
        else:
            logger.info("Skipping orphaned PDF downloads (download_pdfs=False)")
            for attachment_key in orphaned_pdfs:
                self.article_data[attachment_key]["attachment_key"] = attachment_key
                self.article_data[attachment_key]["filename"] = (
                    self.article_data[attachment_key].get("Title", attachment_key)
                )

    def _process_item_pdfs(self, item_attachments: Dict[str, str]) -> None:
        """Download or record regular item PDF attachments."""
        if self.download_pdfs:
            logger.info(
                "Downloading %d regular item PDFs in parallel", len(item_attachments)
            )
            results = download_pdfs_in_parallel(
                self.session,
                self.cfg.user_id,
                self.cfg.api_key,
                item_attachments,
                chunk_size=getattr(self.cfg, "chunk_size", None),
            )
        else:
            logger.info("Skipping regular PDF downloads (download_pdfs=False)")
            results = {}
            for attachment_key, item_key in item_attachments.items():
                self.article_data[item_key]["attachment_key"] = attachment_key
        for item_key, (file_path, filename, attachment_key) in results.items():
            self.article_data[item_key]["filename"] = filename
            self.article_data[item_key]["pdf_url"] = file_path
            self.article_data[item_key]["attachment_key"] = attachment_key
            logger.info("Downloaded Zotero PDF to: %s", file_path)

    def _filter_and_format_papers(self, items: List[Dict[str, Any]]) -> None:
        """Filter and format papers from Zotero items, including standalone PDFs."""
        filter_item_types = (
            self.cfg.zotero.filter_item_types if self.only_articles else []
        )
        logger.debug("Filtering item types: %s", filter_item_types)

        # Maps to track attachments for batch processing
        orphaned_pdfs: Dict[str, str] = {}  # attachment_key -> item key (same for orphans)

        # First pass: process all items without downloading PDFs
        for item in items:
            if not isinstance(item, dict):
                continue

            data = item.get("data", {})
            item_type = data.get("itemType", "N/A")
            key = data.get("key")
            if not key:
                continue

            # CASE 1: Top-level item (e.g., journalArticle)
            if item_type != "attachment":
                collection_paths = self.item_to_collections.get(key, ["/Unknown"])

                self.article_data[key] = {
                    "Title": data.get("title", "N/A"),
                    "Abstract": data.get("abstractNote", "N/A"),
                    "Publication Date": data.get("date", "N/A"),
                    "URL": data.get("url", "N/A"),
                    "Type": item_type,
                    "Collections": collection_paths,
                    "Citation Count": data.get("citationCount", "N/A"),
                    "Venue": data.get("venue", "N/A"),
                    "Publication Venue": data.get("publicationTitle", "N/A"),
                    "Journal Name": data.get("journalAbbreviation", "N/A"),
                    "Authors": [
                        f"{creator.get('firstName', '')} {creator.get('lastName', '')}".strip()
                        for creator in data.get("creators", [])
                        if isinstance(creator, dict)
                        and creator.get("creatorType") == "author"
                    ],
                    "source": "zotero",
                }
                # We'll collect attachment info in second pass

            # CASE 2: Standalone orphaned PDF attachment
            elif data.get("contentType") == "application/pdf" and not data.get(
                "parentItem"
            ):
                attachment_key = key
                filename = data.get("filename", "unknown.pdf")

                # Add to orphaned PDFs for batch processing
                orphaned_pdfs[attachment_key] = (
                    attachment_key  # Same key as both attachment and "item"
                )

                # Create the entry without PDF info yet
                self.article_data[key] = {
                    "Title": filename,
                    "Abstract": "No abstract available",
                    "Publication Date": "N/A",
                    "URL": "N/A",
                    "Type": "orphan_attachment",
                    "Collections": ["/(No Collection)"],
                    "Citation Count": "N/A",
                    "Venue": "N/A",
                    "Publication Venue": "N/A",
                    "Journal Name": "N/A",
                    "Authors": ["(Unknown)"],
                    "source": "zotero",
                }

        # Collect and process attachments
        item_attachments = self._collect_item_attachments()

        # Process orphaned PDFs
        self._process_orphaned_pdfs(orphaned_pdfs)

        # Process regular item PDFs
        self._process_item_pdfs(item_attachments)

        # Ensure we have some results
        if not self.article_data:
            logger.error(
                "No matching papers returned from Zotero for query: '%s'", self.query
            )
            raise RuntimeError(
                "No matching papers returned from Zotero. Please retry the same query."
            )

        logger.info(
            "Filtered %d items (including orphaned attachments)", len(self.article_data)
        )

    def _create_content(self) -> None:
        """Create the content message for the response."""
        top_papers = list(self.article_data.values())[:2]
        top_papers_info = "\n".join(
            [
                f"{i+1}. {paper['Title']} ({paper['Type']})"
                for i, paper in enumerate(top_papers)
            ]
        )

        self.content = "Retrieval was successful. Papers are attached as an artifact."
        self.content += " And here is a summary of the retrieval results:\n"
        self.content += f"Number of papers found: {len(self.article_data)}\n"
        self.content += f"Query: {self.query}\n"
        self.content += "Here are a few of these papers:\n" + top_papers_info

_collect_item_attachments()

Collect PDF attachment keys for non-orphan items.

Source code in aiagents4pharma/talk2scholars/tools/zotero/utils/read_helper.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def _collect_item_attachments(self) -> Dict[str, str]:
    """Collect PDF attachment keys for non-orphan items."""
    item_attachments: Dict[str, str] = {}
    for item_key, item_data in self.article_data.items():
        if item_data.get("Type") == "orphan_attachment":
            continue
        try:
            children = self.zot.children(item_key)
            for child in children:
                data = child.get("data", {})
                if data.get("contentType") == "application/pdf":
                    attachment_key = data.get("key")
                    filename = data.get("filename", "unknown.pdf")
                    if attachment_key:
                        item_attachments[attachment_key] = item_key
                        self.article_data[item_key]["filename"] = filename
                        break
        except Exception as e:
            logger.error("Failed to get attachments for item %s: %s", item_key, e)
    return item_attachments

_create_content()

Create the content message for the response.

Source code in aiagents4pharma/talk2scholars/tools/zotero/utils/read_helper.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def _create_content(self) -> None:
    """Create the content message for the response."""
    top_papers = list(self.article_data.values())[:2]
    top_papers_info = "\n".join(
        [
            f"{i+1}. {paper['Title']} ({paper['Type']})"
            for i, paper in enumerate(top_papers)
        ]
    )

    self.content = "Retrieval was successful. Papers are attached as an artifact."
    self.content += " And here is a summary of the retrieval results:\n"
    self.content += f"Number of papers found: {len(self.article_data)}\n"
    self.content += f"Query: {self.query}\n"
    self.content += "Here are a few of these papers:\n" + top_papers_info

_fetch_items()

Fetch items from Zotero.

Source code in aiagents4pharma/talk2scholars/tools/zotero/utils/read_helper.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def _fetch_items(self) -> List[Dict[str, Any]]:
    """Fetch items from Zotero."""
    try:
        if self.query.strip() == "":
            logger.info(
                "Empty query provided, fetching all items up to max_limit: %d",
                self.cfg.zotero.max_limit,
            )
            items = self.zot.items(limit=self.cfg.zotero.max_limit)
        else:
            items = self.zot.items(
                q=self.query, limit=min(self.limit, self.cfg.zotero.max_limit)
            )
    except Exception as e:
        logger.error("Failed to fetch items from Zotero: %s", e)
        raise RuntimeError(
            "Failed to fetch items from Zotero. Please retry the same query."
        ) from e

    logger.info("Received %d items from Zotero", len(items))

    if not items:
        logger.error("No items returned from Zotero for query: '%s'", self.query)
        raise RuntimeError(
            "No items returned from Zotero. Please retry the same query."
        )

    return items

_filter_and_format_papers(items)

Filter and format papers from Zotero items, including standalone PDFs.

Source code in aiagents4pharma/talk2scholars/tools/zotero/utils/read_helper.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
def _filter_and_format_papers(self, items: List[Dict[str, Any]]) -> None:
    """Filter and format papers from Zotero items, including standalone PDFs."""
    filter_item_types = (
        self.cfg.zotero.filter_item_types if self.only_articles else []
    )
    logger.debug("Filtering item types: %s", filter_item_types)

    # Maps to track attachments for batch processing
    orphaned_pdfs: Dict[str, str] = {}  # attachment_key -> item key (same for orphans)

    # First pass: process all items without downloading PDFs
    for item in items:
        if not isinstance(item, dict):
            continue

        data = item.get("data", {})
        item_type = data.get("itemType", "N/A")
        key = data.get("key")
        if not key:
            continue

        # CASE 1: Top-level item (e.g., journalArticle)
        if item_type != "attachment":
            collection_paths = self.item_to_collections.get(key, ["/Unknown"])

            self.article_data[key] = {
                "Title": data.get("title", "N/A"),
                "Abstract": data.get("abstractNote", "N/A"),
                "Publication Date": data.get("date", "N/A"),
                "URL": data.get("url", "N/A"),
                "Type": item_type,
                "Collections": collection_paths,
                "Citation Count": data.get("citationCount", "N/A"),
                "Venue": data.get("venue", "N/A"),
                "Publication Venue": data.get("publicationTitle", "N/A"),
                "Journal Name": data.get("journalAbbreviation", "N/A"),
                "Authors": [
                    f"{creator.get('firstName', '')} {creator.get('lastName', '')}".strip()
                    for creator in data.get("creators", [])
                    if isinstance(creator, dict)
                    and creator.get("creatorType") == "author"
                ],
                "source": "zotero",
            }
            # We'll collect attachment info in second pass

        # CASE 2: Standalone orphaned PDF attachment
        elif data.get("contentType") == "application/pdf" and not data.get(
            "parentItem"
        ):
            attachment_key = key
            filename = data.get("filename", "unknown.pdf")

            # Add to orphaned PDFs for batch processing
            orphaned_pdfs[attachment_key] = (
                attachment_key  # Same key as both attachment and "item"
            )

            # Create the entry without PDF info yet
            self.article_data[key] = {
                "Title": filename,
                "Abstract": "No abstract available",
                "Publication Date": "N/A",
                "URL": "N/A",
                "Type": "orphan_attachment",
                "Collections": ["/(No Collection)"],
                "Citation Count": "N/A",
                "Venue": "N/A",
                "Publication Venue": "N/A",
                "Journal Name": "N/A",
                "Authors": ["(Unknown)"],
                "source": "zotero",
            }

    # Collect and process attachments
    item_attachments = self._collect_item_attachments()

    # Process orphaned PDFs
    self._process_orphaned_pdfs(orphaned_pdfs)

    # Process regular item PDFs
    self._process_item_pdfs(item_attachments)

    # Ensure we have some results
    if not self.article_data:
        logger.error(
            "No matching papers returned from Zotero for query: '%s'", self.query
        )
        raise RuntimeError(
            "No matching papers returned from Zotero. Please retry the same query."
        )

    logger.info(
        "Filtered %d items (including orphaned attachments)", len(self.article_data)
    )

_init_zotero_client()

Initialize Zotero client.

Source code in aiagents4pharma/talk2scholars/tools/zotero/utils/read_helper.py
70
71
72
73
74
75
76
77
78
def _init_zotero_client(self) -> zotero.Zotero:
    """Initialize Zotero client."""
    logger.info(
        "Searching Zotero for query: '%s' (only_articles: %s, limit: %d)",
        self.query,
        self.only_articles,
        self.limit,
    )
    return zotero.Zotero(self.cfg.user_id, self.cfg.library_type, self.cfg.api_key)

_load_config()

Load hydra configuration.

Source code in aiagents4pharma/talk2scholars/tools/zotero/utils/read_helper.py
61
62
63
64
65
66
67
68
def _load_config(self) -> Any:
    """Load hydra configuration."""
    with hydra.initialize(version_base=None, config_path="../../../configs"):
        cfg = hydra.compose(
            config_name="config", overrides=["tools/zotero_read=default"]
        )
        logger.info("Loaded configuration for Zotero search tool")
        return cfg.tools.zotero_read

_process_item_pdfs(item_attachments)

Download or record regular item PDF attachments.

Source code in aiagents4pharma/talk2scholars/tools/zotero/utils/read_helper.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def _process_item_pdfs(self, item_attachments: Dict[str, str]) -> None:
    """Download or record regular item PDF attachments."""
    if self.download_pdfs:
        logger.info(
            "Downloading %d regular item PDFs in parallel", len(item_attachments)
        )
        results = download_pdfs_in_parallel(
            self.session,
            self.cfg.user_id,
            self.cfg.api_key,
            item_attachments,
            chunk_size=getattr(self.cfg, "chunk_size", None),
        )
    else:
        logger.info("Skipping regular PDF downloads (download_pdfs=False)")
        results = {}
        for attachment_key, item_key in item_attachments.items():
            self.article_data[item_key]["attachment_key"] = attachment_key
    for item_key, (file_path, filename, attachment_key) in results.items():
        self.article_data[item_key]["filename"] = filename
        self.article_data[item_key]["pdf_url"] = file_path
        self.article_data[item_key]["attachment_key"] = attachment_key
        logger.info("Downloaded Zotero PDF to: %s", file_path)

_process_orphaned_pdfs(orphaned_pdfs)

Download or record orphaned PDF attachments.

Source code in aiagents4pharma/talk2scholars/tools/zotero/utils/read_helper.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def _process_orphaned_pdfs(self, orphaned_pdfs: Dict[str, str]) -> None:
    """Download or record orphaned PDF attachments."""
    if self.download_pdfs:
        logger.info("Downloading %d orphaned PDFs in parallel", len(orphaned_pdfs))
        results = download_pdfs_in_parallel(
            self.session,
            self.cfg.user_id,
            self.cfg.api_key,
            orphaned_pdfs,
            chunk_size=getattr(self.cfg, "chunk_size", None),
        )
        for item_key, (file_path, filename, attachment_key) in results.items():
            self.article_data[item_key]["filename"] = filename
            self.article_data[item_key]["pdf_url"] = file_path
            self.article_data[item_key]["attachment_key"] = attachment_key
            logger.info("Downloaded orphaned Zotero PDF to: %s", file_path)
    else:
        logger.info("Skipping orphaned PDF downloads (download_pdfs=False)")
        for attachment_key in orphaned_pdfs:
            self.article_data[attachment_key]["attachment_key"] = attachment_key
            self.article_data[attachment_key]["filename"] = (
                self.article_data[attachment_key].get("Title", attachment_key)
            )

get_search_results()

Get the search results and content.

Source code in aiagents4pharma/talk2scholars/tools/zotero/utils/read_helper.py
54
55
56
57
58
59
def get_search_results(self) -> Dict[str, Any]:
    """Get the search results and content."""
    return {
        "article_data": self.article_data,
        "content": self.content,
    }

Process the search request and prepare results.

Source code in aiagents4pharma/talk2scholars/tools/zotero/utils/read_helper.py
48
49
50
51
52
def process_search(self) -> None:
    """Process the search request and prepare results."""
    items = self._fetch_items()
    self._filter_and_format_papers(items)
    self._create_content()