Skip to content

Singleton Manager

Singleton manager for Milvus connections and vector stores. Handles connection reuse, event loops, and GPU detection caching.

VectorstoreSingleton

Singleton manager for Milvus connections and vector stores.

Source code in aiagents4pharma/talk2scholars/tools/pdf/utils/singleton_manager.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
class VectorstoreSingleton:
    """Singleton manager for Milvus connections and vector stores."""

    _instance = None
    _lock = threading.Lock()
    _connections = {}  # Store connections by connection string
    _vector_stores = {}  # Store vector stores by collection name
    _event_loops = {}  # Store event loops by thread ID
    _gpu_detected = None  # Cache GPU detection result

    def __new__(cls):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super().__new__(cls)
        return cls._instance

    def get_event_loop(self) -> asyncio.AbstractEventLoop:
        """Get or create event loop for current thread."""
        thread_id = threading.get_ident()

        if thread_id not in self._event_loops:
            try:
                loop = asyncio.get_event_loop()
                if loop.is_closed():
                    raise RuntimeError("Event loop is closed")
            except RuntimeError:
                loop = asyncio.new_event_loop()
                asyncio.set_event_loop(loop)
            self._event_loops[thread_id] = loop
            logger.info("Created new event loop for thread %s", thread_id)

        return self._event_loops[thread_id]

    def detect_gpu_once(self) -> bool:
        """Detect GPU availability once and cache the result."""
        if self._gpu_detected is None:
            self._gpu_detected = detect_nvidia_gpu()
            gpu_status = "available" if self._gpu_detected else "not available"
            logger.info("GPU detection completed: NVIDIA GPU %s", gpu_status)
        return self._gpu_detected

    def get_connection(self, host: str, port: int, db_name: str) -> str:
        """Get or create a Milvus connection."""
        conn_key = f"{host}:{port}/{db_name}"

        if conn_key not in self._connections:
            try:
                # Check if already connected
                if connections.has_connection("default"):
                    connections.remove_connection("default")

                # Connect to Milvus
                connections.connect(
                    alias="default",
                    host=host,
                    port=port,
                )
                logger.info("Connected to Milvus at %s:%s", host, port)

                # Check if database exists, create if not
                existing_dbs = db.list_database()
                if db_name not in existing_dbs:
                    db.create_database(db_name)
                    logger.info("Created database: %s", db_name)

                # Use the database
                db.using_database(db_name)
                logger.info("Using database: %s", db_name)
                logger.debug(
                    "Milvus DB switched to: %s, available collections: %s",
                    db_name,
                    utility.list_collections(),
                )

                self._connections[conn_key] = "default"

            except MilvusException as e:
                logger.error("Failed to connect to Milvus: %s", e)
                raise

        return self._connections[conn_key]

    def get_vector_store(
        self,
        collection_name: str,
        embedding_model: Embeddings,
        connection_args: Dict[str, Any],
    ) -> Milvus:
        """Get or create a vector store for a collection."""
        if collection_name not in self._vector_stores:
            # Ensure event loop exists for this thread
            self.get_event_loop()

            # Create LangChain Milvus instance with explicit URI format
            # This ensures LangChain uses the correct host
            milvus_uri = f"http://{connection_args['host']}:{connection_args['port']}"

            vector_store = Milvus(
                embedding_function=embedding_model,
                collection_name=collection_name,
                connection_args={
                    "uri": milvus_uri,  # Use URI format instead of host/port
                    "host": connection_args["host"],
                    "port": connection_args["port"],
                },
                text_field="text",
                auto_id=False,
                drop_old=False,
                consistency_level="Strong",
            )

            self._vector_stores[collection_name] = vector_store
            logger.info(
                "Created new vector store for collection: %s with URI: %s",
                collection_name,
                milvus_uri,
            )

        return self._vector_stores[collection_name]

detect_gpu_once()

Detect GPU availability once and cache the result.

Source code in aiagents4pharma/talk2scholars/tools/pdf/utils/singleton_manager.py
55
56
57
58
59
60
61
def detect_gpu_once(self) -> bool:
    """Detect GPU availability once and cache the result."""
    if self._gpu_detected is None:
        self._gpu_detected = detect_nvidia_gpu()
        gpu_status = "available" if self._gpu_detected else "not available"
        logger.info("GPU detection completed: NVIDIA GPU %s", gpu_status)
    return self._gpu_detected

get_connection(host, port, db_name)

Get or create a Milvus connection.

Source code in aiagents4pharma/talk2scholars/tools/pdf/utils/singleton_manager.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def get_connection(self, host: str, port: int, db_name: str) -> str:
    """Get or create a Milvus connection."""
    conn_key = f"{host}:{port}/{db_name}"

    if conn_key not in self._connections:
        try:
            # Check if already connected
            if connections.has_connection("default"):
                connections.remove_connection("default")

            # Connect to Milvus
            connections.connect(
                alias="default",
                host=host,
                port=port,
            )
            logger.info("Connected to Milvus at %s:%s", host, port)

            # Check if database exists, create if not
            existing_dbs = db.list_database()
            if db_name not in existing_dbs:
                db.create_database(db_name)
                logger.info("Created database: %s", db_name)

            # Use the database
            db.using_database(db_name)
            logger.info("Using database: %s", db_name)
            logger.debug(
                "Milvus DB switched to: %s, available collections: %s",
                db_name,
                utility.list_collections(),
            )

            self._connections[conn_key] = "default"

        except MilvusException as e:
            logger.error("Failed to connect to Milvus: %s", e)
            raise

    return self._connections[conn_key]

get_event_loop()

Get or create event loop for current thread.

Source code in aiagents4pharma/talk2scholars/tools/pdf/utils/singleton_manager.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def get_event_loop(self) -> asyncio.AbstractEventLoop:
    """Get or create event loop for current thread."""
    thread_id = threading.get_ident()

    if thread_id not in self._event_loops:
        try:
            loop = asyncio.get_event_loop()
            if loop.is_closed():
                raise RuntimeError("Event loop is closed")
        except RuntimeError:
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
        self._event_loops[thread_id] = loop
        logger.info("Created new event loop for thread %s", thread_id)

    return self._event_loops[thread_id]

get_vector_store(collection_name, embedding_model, connection_args)

Get or create a vector store for a collection.

Source code in aiagents4pharma/talk2scholars/tools/pdf/utils/singleton_manager.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def get_vector_store(
    self,
    collection_name: str,
    embedding_model: Embeddings,
    connection_args: Dict[str, Any],
) -> Milvus:
    """Get or create a vector store for a collection."""
    if collection_name not in self._vector_stores:
        # Ensure event loop exists for this thread
        self.get_event_loop()

        # Create LangChain Milvus instance with explicit URI format
        # This ensures LangChain uses the correct host
        milvus_uri = f"http://{connection_args['host']}:{connection_args['port']}"

        vector_store = Milvus(
            embedding_function=embedding_model,
            collection_name=collection_name,
            connection_args={
                "uri": milvus_uri,  # Use URI format instead of host/port
                "host": connection_args["host"],
                "port": connection_args["port"],
            },
            text_field="text",
            auto_id=False,
            drop_old=False,
            consistency_level="Strong",
        )

        self._vector_stores[collection_name] = vector_store
        logger.info(
            "Created new vector store for collection: %s with URI: %s",
            collection_name,
            milvus_uri,
        )

    return self._vector_stores[collection_name]