Source code for llmtuner.llms.anyllm

import yaml
from datetime import datetime
import logging

from IPython.display import Markdown, HTML, display

from llmtuner.interface import Interface

[docs] class AnyLLMBase(Interface): """Base class to handle interface to AnythingLLM""" def __init__(self, config = ""): """Initialize Chatter passing LLMTunerConfig, filepath of the configuration or no configuration at all.""" super().__init__("anythingllm", config)
[docs] def syncronize_store(self, updatestore = False): # to be added """Check consistency between instance and store and update if required.""" pass
[docs] def get_workspaces_list(self, synchronize = False): """Get currently available workspace list from AnythingLLM and synchronize with local DB if 'synchronize'=True""" response = self._get("workspaces", "") wslist = [] if response.ok: if "workspaces" in response.json(): for entry in response.json()["workspaces"]: if "slug" in entry: wslist.append(entry["slug"]) if not wslist: print ("Not getting workspaces from this answer:", response) # to do: synchronize return wslist
[docs] class AnyLLMBuilder(AnyLLMBase): """Interface to create workspaces and embed documents in AnythingLLM""" def __init__(self, workspace_slug = ""): super().__init__()
[docs] self.workspace = None
[docs] self.vectordb = AnyLLMDocuments()
[docs] self.docids = []
if workspace_slug: self.set_workspace(workspace_slug)
[docs] def get_available_documents(self, include_staged = True, include_embedded = True, include_workspace = True, filters = {"indexed": {}, "staged": {}, "embedded": {}, "added": {}}, common_only = False, docids_only = True): """Get list of available documents indexed, staged, embedded in vector database, and, if workspace is set, added to the workspace, sorted by docid. You can provide a dictionary to filter for specific entries by providing column names and values for the tables.""" outputs = {} outputs.setdefault("indexed", self.config.store.get_data_from_table("documentindex", filters["indexed"], True)) if include_staged: outputs.setdefault("staged", self.config.store.get_data_from_table("stagingdocuments", filters["staged"], True)) if include_embedded: outputs.setdefault("embedded", self.config.store.get_data_from_table("vectordocsbase", filters["embedded"], True)) if include_workspace and self.workspace: outputs.setdefault("added", self.workspace.meta["documents"]) elif include_workspace: print ("You requested a document list from the workspace, but the workspace is not set. Set the workspace first.") if docids_only: for key in outputs: newlist = [] for entry in outputs[key]: if "docid" in entry: newlist.append(entry["docid"]) outputs[key] = newlist return outputs
[docs] def add_document(self, url, metadata = {}): """Manually add a document to the document index by providing the url and optionally additional metadata.""" manualdoc = {"url": url, "docid": "open:"+url, "sourceid": "open", "sourceformat": "text/html", "download": "webpage" } for key in manualdoc: if key in metadata: manualdoc[key] = metadata[key] self.config.store.add_data_to_table("documentindex", manualdoc, ["docid"])
[docs] def stage_documents(self, docids = [], update = True): """Download documents locally and prepare for embedding""" availabledocs = self.config.store.get_data_from_table("documentindex", {"docid": docids}, True) for docinfo in availabledocs: logging.debug("Got docinfo", docinfo) talker = False logging.debug("Got talkers", self.config.talkers) for interface in self.config.talkers: if interface == docinfo["sourceid"]: talker = self.config.talkers[interface]() continue stageddocs = self.config.store.get_data_from_table("stagingdocuments", {"docid": docinfo["docid"]}, True) if talker: talker.download([docinfo["docid"]]) logging.info("Downloading document "+docinfo["docid"]) else: logging.warning("Did not find talker to retrieve the document for sourceid", docinfo["sourceid"])
[docs] def embed_documents(self, docids = []): """Add a datafile to the vector database according to docid""" stageddocs = self.config.store.get_data_from_table("stagingdocuments", {"docid": docids}, True) for entry in stageddocs: self.vectordb.upload_document(entry["filepath"], entry["content"], entry["url"], entry["docid"], entry["contentformat"])
[docs] def move_documents_to_space(self, docids=[]): """Adding embedded documents to the selected space""" if self.workspace: self.workspace.move_documents(docids_add = docids, docids_remove = []) else: logging.warning("You cannot add documents to a workspace if the space is not defined before.")
[docs] def list_documents_in_space(self): """List documents added to the workspace""" meta = self.workspace._get_metadata_from_server(getfulldocs = True) return meta["documents"]
[docs] def remove_documents_from_space(self, docids=[]): """Removing documents from the selected space (will still be in the database)""" if self.workspace: self.workspace.move_documents(docids_remove = docids) else: print ("You cannot add documents to a workspace if the space is not defined before.")
[docs] def delete_documents(self, docids = []): """Delete documents from the vector database""" self.vectordb.remove_documents(docids)
[docs] def unstage_documents(self, docids = []): """Deletes entries from local storage""" stageddocs = self.config.store.get_data_from_table("stagingdocuments", {"docid": docids}, True) for entry in stageddocs: if entry["filepath"]: self.config.store.delete_local(entry["filepath"]) self.config.store.delete_data_from_table("stagingdocuments", {"docid": entry["docid"]})
[docs] def unlist_documents(self, docids = []): """Remove a document from the document index.""" for docid in docids: self.config.store.delete_data_from_table("documentindex", {"docid": docid})
[docs] def set_workspace(self, workspaceslug, create_new = False): """Set the workspace slug and load workspace from server, or set up a new workspace if 'create_new' = True""" if workspaceslug: try: self.workspace = AnyLLMSpace(workspaceslug) except: print ("Could not initialize workspace from slug", workspaceslug) return else: print ("Please provide a name (slug) for the workspace!") if create_new: self.workspace.create_on_server() newmeta = self.workspace._get_metadata_from_server(True, True) if not newmeta: print ("Could not establish workspace!")
[docs] class AnyLLMChatter(AnyLLMBase): """Interface to chat with a specific workspace.""" def __init__(self, workspaceslug = "", historylength = 5): """Chat with a workspace""" super().__init__()
[docs] self.workspace = False
[docs] self.chathistory = []
[docs] self.historylength = historylength
if not workspaceslug: logging.warning("Please provide a workspace name to make the chatter work!") else: self.set_workspace(workspaceslug)
[docs] def set_workspace(self, workspaceslug): try: self.workspace = AnyLLMSpace(workspaceslug) except: print ("Could not initialize workspace from slug", workspaceslug)
[docs] def chat(self, prompt, return_metadata = False, display_only = False): """Get a reply to a prompt from the workspace""" if self.workspace: reply = self.workspace.chat(prompt, True) self.chathistory.append({"prompt": prompt, "reply": reply}) if len(self.chathistory) > self.historylength: del self.chathistory[0] if display_only: self._format_for_jupyter() else: if return_metadata: return reply else: return reply["textResponse"] else: logging.warning("You have not defined a workspace yet, please provide a valid workspace name!")
[docs] def _format_for_jupyter(self, response = ""): """Make a nice output of the reply and references for a Jupyter notebook""" if not response: response = self.chathistory[-1]["reply"] if type(response) is str: display(Markdown(response)) else: display(Markdown(response["textResponse"])) if len (response["sources"])>0: display(Markdown("----\n # References\n")) for source in response["sources"]: docid = source["url"][source["url"].rfind("/")+1:source["url"].rfind("XX")] refentry = self.config.store.get_data_from_table("documentindex", {"docid": docid}, asdicts=True) if len(refentry)>0: display(Markdown("### "+refentry[0]["url"]+"\n")) display(HTML(source["text"][source["text"].find("/document_metadata>")+20:len(source["text"])]))
[docs] class AnyLLMDocuments(AnyLLMBase): """Class to manage documents in Anything LLM with vector database.""" def __init__(self): super().__init__()
[docs] def _get_document(self, documentname): """Getting info for one document.""" self._get("document", documentname) if not self.response.ok: print ("Could not get document")
[docs] def upload_document(self, filepath = "", content = "", uploadname = "", referenceid = "", fileformat=""): """Uploads a file or text content from local storage and updates vectordatabase local table.""" if not filepath and not content: logging.warning("You must provide either a filepath or content to the upload function!") elif filepath: if not fileformat: fileformat = "application/pdf" uploadname = filepath[filepath.rfind("/")+1:len(filepath)] if referenceid: uploadname = referenceid + "XX" + uploadname if uploadname.find(".")<1: uploadname += "." + fileformat.split("/")[-1] files = { "file": (uploadname, open(filepath, 'rb'), fileformat), } elif content: if not fileformat: fileformat = "text/html" if referenceid: uploadname = str(referenceid) + "XX" uploadname += "." + fileformat.split("/")[-1] logging.info("Uploading: "+referenceid) files = { "file": (uploadname, content, fileformat), } self._post("document", "upload", files) if self.response.ok: item = self.response.json()["documents"][0] docinfo = { "embeddedid": item["id"], "name": item["location"].split("/")[1], "docid": str(referenceid), "folder": item["location"].split("/")[0] } self.config.store.add_data_to_table("vectordocsbase", docinfo, update=True) else: logging.warning("Can't upload file, got error", self.response.status_code)
[docs] def get_documents_list(self, synchronize = False): """Get currently embedded document list from AnythingLLM and synchronize with local DB if 'synchronize'=True""" self._get("documents", "") doclist = [] for entry in self.response.json()["localFiles"]["items"]: #folder folder = entry["name"] for item in entry["items"]: docinfo = { "folder": folder, "embeddedid": item["id"], "name": item["name"], "docid": item["name"].split("XX")[0] } doclist.append(docinfo) if synchronize: for entry in doclist: self.config.store.add_data_to_table("vectordocsbase", entry, ["docid"], False) return doclist
[docs] def _delete_documents(self, docnames): """Deleting documents from the vector database with the internal anythingllm name (docnames []).""" if not type(docnames) is list: print ("please provide a list of document names, now got type", type(docnames)) else: data = {"names": docnames} self._delete("system/remove-documents", data)
[docs] def remove_documents(self, docids): """Removing documents from the vector database based on their docids""" embeddeddocs = self.config.store.get_data_from_table("vectordocsbase", {"docid": docids}, True) vector_db_names = [] for entry in embeddeddocs: vector_db_names.append(entry["name"]) self._delete_documents(vector_db_names) for entry in embeddeddocs: self.config.store.delete_data_from_table("vectordocsbase", entry)
[docs] workspacemap_server_db = {"id": "modelid", "threads": "exclude", "documents": "exclude"}
[docs] class AnyLLMSpace(AnyLLMBase): """Class to interact with space/model in Anything LLM""" def __init__(self, spaceslug = "", config = None): super().__init__()
[docs] self.slug = spaceslug # Slug of the workspace in AnythingLLM
[docs] self.meta = {}
[docs] def _get_metadata_from_server(self, updateobject = False, getfulldocs = False): """Read metadata for workspace from AnythingLLM server, and update the metadata of this instance. Returns either list of docids or full information on included documents.""" response = self._get("workspace/"+self.slug) newmeta = {} if response.ok and "workspace" in response.json(): for entry in response.json()["workspace"]: if entry["slug"] != self.slug: continue for key in entry.keys(): if key in workspacemap_server_db: if workspacemap_server_db[key] != "exclude": newmeta.setdefault(workspacemap_server_db[key], entry[key]) else: newmeta.setdefault(key, entry[key]) if "documents" in entry: if getfulldocs: newmeta.setdefault("documents", entry["documents"]) else: docnames = [] for doc in entry["documents"]: docnames.append(doc["docpath"].split("/")[1]) doclist = self.config.store.get_data_from_table("vectordocsbase", {"name": docnames}, True) docids = [] for entry in doclist: docids.append(entry["docid"]) newmeta["documents"] = docids else: print ("Could not retrieve the workspace", self.slug, "from the server. Check that it exists!") if updateobject: self.meta = newmeta return newmeta
[docs] def _set_metadata_on_server(self, metadata = {}): if not metadata: metadata = self.meta response = self._post("workspace/"+self.slug, "update", metadata, True) if response.ok and "workspace" in response.json(): for key in response.json()["workspace"]: self.meta.setdefault(key, response.json()["workspace"][key])
[docs] def _get_metadata_from_database(self, updateobject = False): localmeta = self.config.store.get_data_from_table("workspaces", {"slug": self.slug}, True) if len(localmeta) == 1: if updateobject: self.meta = localmeta[0] return localmeta[0]
[docs] def _set_metadata_to_database(self, metadata = {}): """Set local database entry from object metadata""" if not metadata and not self.meta: print ("Got no metadata to set, nothing will be written.") if metadata: self.config.store.add_data_to_table("workspaces", metadata, ["slug"]) print ("Adding data to workspaces", metadata) elif self.meta: self.config.store.add_data_to_table("workspaces", self.meta, ["slug"]) print ("Adding data to workspaces", self.meta)
[docs] def from_file(cls, filepath, config, initialize_on_server = False): # should be read from DB newone = cls() with open(filepath, 'r') as yaml_file: newone.meta = yaml.load(yaml_file, Loader=yaml.FullLoader) return newone
[docs] def get_metadata(self, printit = False): """Display the metadata of the current workspace. If 'printit', info will be displayed, not returned.""" if printit: for key in self.meta: print (entry, ":", self.meta[key]) else: return self.meta
[docs] def write(self, filepath): """Write configuration of a workspace to a local yaml file""" with open(filepath, 'w') as yaml_file: yaml.dump(self.meta, yaml_file, default_flow_style=False)
[docs] def create_on_server(self): """Create a new workspace from the local metadata""" response = False if not self.meta and not self.slug: print ("No metadata for creation found. You have to set the metadata first!") else: if self.slug: if not self.slug in self.get_workspaces_list(): data = {"name": str(self.slug)} response = self._post("workspace", "new", data, True) else: print (self.slug, "already an existing workspace, will not add.") else: print ("Did not find 'slug' in metadata. Needed to create a workspace.") if response: if response.ok: if "workspace" in response.json(): updatedic = {} if type(response.json()["workspace"]) is not dict: return response for key in response.json()["workspace"]: if key.find("At")<0 and key in self.meta: updatedic.setdefault(key, self.meta[key]) self._set_metadata_on_server(updatedic) else: return response
[docs] def delete_from_server(self): """Remove a workspace from the server""" if self.slug: self._delete("workspace/"+self.slug) if self.response.ok: print ("Deleted", self.slug) self.slug = "" else: print ("Error deleting "+self.slug+". Error", self.response.status_code)
[docs] def set_workspacename(self, slug): """Set the name of the workspace and check existance of configuration""" self.slug = slug
[docs] def chat(self, prompt, provide_metadata = False): """Use the workspace for chatting""" message = {"message": prompt, "mode": "chat" } logging.info("Querying "+self.slug+": "+prompt) response = self._post("workspace/"+self.slug, "chat", message, "application/json", "json") if response.ok: if "type" in response.json(): if provide_metadata: return response.json() if response.json()["type"] == "textResponse": return response.json()["textResponse"] else: logging.warning("Did not get response of type 'textResponse', but", response.json()["type"],". Aborted.") return False else: logging.warning("Did not find required 'type' in response. Aborted.") return False else: logging.warning("Response not OK: Got error status", response.status_code) return False
[docs] def move_documents(self, docids_add = [], docids_remove = []): """Add documents to the workspace""" docs_add = self.config.store.get_data_from_table("vectordocsbase", {"docid": docids_add}, True) docs_remove = self.config.store.get_data_from_table("vectordocsbase", {"docid": docids_remove}, True) logging.info("Moving "+str(len(docs_add))+" to workspace and removing "+str(len(docs_remove))) update = {"adds": [], "deletes": []} for entry in docs_add: update["adds"].append(entry["folder"]+"/"+entry["name"]) for entry in docs_remove: update["deletes"].append(entry["folder"]+"/"+entry["name"]) logging.debug("Updates list: "+str(update)) response = self._post("workspace/"+self.slug,"update-embeddings", update, passdata = "json")