import yaml
from datetime import datetime
import logging
from IPython.display import Markdown, HTML, display
from llmtuner.interface import Interface
[docs]
class AnyLLMBase(Interface):
"""Base class to handle interface to AnythingLLM"""
def __init__(self, config = ""):
"""Initialize Chatter passing LLMTunerConfig, filepath of the configuration or no configuration at all."""
super().__init__("anythingllm", config)
[docs]
def syncronize_store(self, updatestore = False): # to be added
"""Check consistency between instance and store and update if required."""
pass
[docs]
def get_workspaces_list(self, synchronize = False):
"""Get currently available workspace list from AnythingLLM and synchronize with local DB if 'synchronize'=True"""
response = self._get("workspaces", "")
wslist = []
if response.ok:
if "workspaces" in response.json():
for entry in response.json()["workspaces"]:
if "slug" in entry:
wslist.append(entry["slug"])
if not wslist:
print ("Not getting workspaces from this answer:", response)
# to do: synchronize
return wslist
[docs]
class AnyLLMBuilder(AnyLLMBase):
"""Interface to create workspaces and embed documents in AnythingLLM"""
def __init__(self, workspace_slug = ""):
super().__init__()
[docs]
self.vectordb = AnyLLMDocuments()
if workspace_slug:
self.set_workspace(workspace_slug)
[docs]
def get_available_documents(self, include_staged = True, include_embedded = True, include_workspace = True,
filters = {"indexed": {}, "staged": {}, "embedded": {}, "added": {}},
common_only = False, docids_only = True):
"""Get list of available documents indexed, staged, embedded in vector database, and, if workspace is set,
added to the workspace, sorted by docid. You can provide a dictionary to filter for specific entries
by providing column names and values for the tables."""
outputs = {}
outputs.setdefault("indexed", self.config.store.get_data_from_table("documentindex", filters["indexed"], True))
if include_staged:
outputs.setdefault("staged", self.config.store.get_data_from_table("stagingdocuments", filters["staged"], True))
if include_embedded:
outputs.setdefault("embedded", self.config.store.get_data_from_table("vectordocsbase", filters["embedded"], True))
if include_workspace and self.workspace:
outputs.setdefault("added", self.workspace.meta["documents"])
elif include_workspace:
print ("You requested a document list from the workspace, but the workspace is not set. Set the workspace first.")
if docids_only:
for key in outputs:
newlist = []
for entry in outputs[key]:
if "docid" in entry:
newlist.append(entry["docid"])
outputs[key] = newlist
return outputs
[docs]
def add_document(self, url, metadata = {}):
"""Manually add a document to the document index by providing the url and optionally additional metadata."""
manualdoc = {"url": url, "docid": "open:"+url,
"sourceid": "open", "sourceformat": "text/html", "download": "webpage"
}
for key in manualdoc:
if key in metadata:
manualdoc[key] = metadata[key]
self.config.store.add_data_to_table("documentindex", manualdoc, ["docid"])
[docs]
def stage_documents(self, docids = [], update = True):
"""Download documents locally and prepare for embedding"""
availabledocs = self.config.store.get_data_from_table("documentindex", {"docid": docids}, True)
for docinfo in availabledocs:
logging.debug("Got docinfo", docinfo)
talker = False
logging.debug("Got talkers", self.config.talkers)
for interface in self.config.talkers:
if interface == docinfo["sourceid"]:
talker = self.config.talkers[interface]()
continue
stageddocs = self.config.store.get_data_from_table("stagingdocuments", {"docid": docinfo["docid"]}, True)
if talker:
talker.download([docinfo["docid"]])
logging.info("Downloading document "+docinfo["docid"])
else:
logging.warning("Did not find talker to retrieve the document for sourceid", docinfo["sourceid"])
[docs]
def embed_documents(self, docids = []):
"""Add a datafile to the vector database according to docid"""
stageddocs = self.config.store.get_data_from_table("stagingdocuments", {"docid": docids}, True)
for entry in stageddocs:
self.vectordb.upload_document(entry["filepath"], entry["content"], entry["url"], entry["docid"], entry["contentformat"])
[docs]
def move_documents_to_space(self, docids=[]):
"""Adding embedded documents to the selected space"""
if self.workspace:
self.workspace.move_documents(docids_add = docids, docids_remove = [])
else:
logging.warning("You cannot add documents to a workspace if the space is not defined before.")
[docs]
def list_documents_in_space(self):
"""List documents added to the workspace"""
meta = self.workspace._get_metadata_from_server(getfulldocs = True)
return meta["documents"]
[docs]
def remove_documents_from_space(self, docids=[]):
"""Removing documents from the selected space (will still be in the database)"""
if self.workspace:
self.workspace.move_documents(docids_remove = docids)
else:
print ("You cannot add documents to a workspace if the space is not defined before.")
[docs]
def delete_documents(self, docids = []):
"""Delete documents from the vector database"""
self.vectordb.remove_documents(docids)
[docs]
def unstage_documents(self, docids = []):
"""Deletes entries from local storage"""
stageddocs = self.config.store.get_data_from_table("stagingdocuments", {"docid": docids}, True)
for entry in stageddocs:
if entry["filepath"]:
self.config.store.delete_local(entry["filepath"])
self.config.store.delete_data_from_table("stagingdocuments", {"docid": entry["docid"]})
[docs]
def unlist_documents(self, docids = []):
"""Remove a document from the document index."""
for docid in docids:
self.config.store.delete_data_from_table("documentindex", {"docid": docid})
[docs]
def set_workspace(self, workspaceslug, create_new = False):
"""Set the workspace slug and load workspace from server, or set up a new workspace if 'create_new' = True"""
if workspaceslug:
try:
self.workspace = AnyLLMSpace(workspaceslug)
except:
print ("Could not initialize workspace from slug", workspaceslug)
return
else:
print ("Please provide a name (slug) for the workspace!")
if create_new:
self.workspace.create_on_server()
newmeta = self.workspace._get_metadata_from_server(True, True)
if not newmeta:
print ("Could not establish workspace!")
[docs]
class AnyLLMChatter(AnyLLMBase):
"""Interface to chat with a specific workspace."""
def __init__(self, workspaceslug = "", historylength = 5):
"""Chat with a workspace"""
super().__init__()
[docs]
self.chathistory = []
[docs]
self.historylength = historylength
if not workspaceslug:
logging.warning("Please provide a workspace name to make the chatter work!")
else:
self.set_workspace(workspaceslug)
[docs]
def set_workspace(self, workspaceslug):
try:
self.workspace = AnyLLMSpace(workspaceslug)
except:
print ("Could not initialize workspace from slug", workspaceslug)
[docs]
def chat(self, prompt, return_metadata = False, display_only = False):
"""Get a reply to a prompt from the workspace"""
if self.workspace:
reply = self.workspace.chat(prompt, True)
self.chathistory.append({"prompt": prompt, "reply": reply})
if len(self.chathistory) > self.historylength:
del self.chathistory[0]
if display_only:
self._format_for_jupyter()
else:
if return_metadata:
return reply
else:
return reply["textResponse"]
else:
logging.warning("You have not defined a workspace yet, please provide a valid workspace name!")
[docs]
class AnyLLMDocuments(AnyLLMBase):
"""Class to manage documents in Anything LLM with vector database."""
def __init__(self):
super().__init__()
[docs]
def _get_document(self, documentname):
"""Getting info for one document."""
self._get("document", documentname)
if not self.response.ok:
print ("Could not get document")
[docs]
def upload_document(self, filepath = "", content = "", uploadname = "", referenceid = "", fileformat=""):
"""Uploads a file or text content from local storage and updates vectordatabase local table."""
if not filepath and not content:
logging.warning("You must provide either a filepath or content to the upload function!")
elif filepath:
if not fileformat:
fileformat = "application/pdf"
uploadname = filepath[filepath.rfind("/")+1:len(filepath)]
if referenceid:
uploadname = referenceid + "XX" + uploadname
if uploadname.find(".")<1:
uploadname += "." + fileformat.split("/")[-1]
files = {
"file": (uploadname, open(filepath, 'rb'), fileformat),
}
elif content:
if not fileformat:
fileformat = "text/html"
if referenceid:
uploadname = str(referenceid) + "XX"
uploadname += "." + fileformat.split("/")[-1]
logging.info("Uploading: "+referenceid)
files = {
"file": (uploadname, content, fileformat),
}
self._post("document", "upload", files)
if self.response.ok:
item = self.response.json()["documents"][0]
docinfo = {
"embeddedid": item["id"],
"name": item["location"].split("/")[1],
"docid": str(referenceid),
"folder": item["location"].split("/")[0]
}
self.config.store.add_data_to_table("vectordocsbase", docinfo, update=True)
else:
logging.warning("Can't upload file, got error", self.response.status_code)
[docs]
def get_documents_list(self, synchronize = False):
"""Get currently embedded document list from AnythingLLM and synchronize with local DB if 'synchronize'=True"""
self._get("documents", "")
doclist = []
for entry in self.response.json()["localFiles"]["items"]: #folder
folder = entry["name"]
for item in entry["items"]:
docinfo = {
"folder": folder,
"embeddedid": item["id"],
"name": item["name"],
"docid": item["name"].split("XX")[0]
}
doclist.append(docinfo)
if synchronize:
for entry in doclist:
self.config.store.add_data_to_table("vectordocsbase", entry, ["docid"], False)
return doclist
[docs]
def _delete_documents(self, docnames):
"""Deleting documents from the vector database with the internal anythingllm name (docnames [])."""
if not type(docnames) is list:
print ("please provide a list of document names, now got type", type(docnames))
else:
data = {"names": docnames}
self._delete("system/remove-documents", data)
[docs]
def remove_documents(self, docids):
"""Removing documents from the vector database based on their docids"""
embeddeddocs = self.config.store.get_data_from_table("vectordocsbase", {"docid": docids}, True)
vector_db_names = []
for entry in embeddeddocs:
vector_db_names.append(entry["name"])
self._delete_documents(vector_db_names)
for entry in embeddeddocs:
self.config.store.delete_data_from_table("vectordocsbase", entry)
[docs]
workspacemap_server_db = {"id": "modelid", "threads": "exclude", "documents": "exclude"}
[docs]
class AnyLLMSpace(AnyLLMBase):
"""Class to interact with space/model in Anything LLM"""
def __init__(self, spaceslug = "", config = None):
super().__init__()
[docs]
self.slug = spaceslug # Slug of the workspace in AnythingLLM
[docs]
def from_file(cls, filepath, config, initialize_on_server = False): # should be read from DB
newone = cls()
with open(filepath, 'r') as yaml_file:
newone.meta = yaml.load(yaml_file, Loader=yaml.FullLoader)
return newone
[docs]
def write(self, filepath):
"""Write configuration of a workspace to a local yaml file"""
with open(filepath, 'w') as yaml_file:
yaml.dump(self.meta, yaml_file, default_flow_style=False)
[docs]
def create_on_server(self):
"""Create a new workspace from the local metadata"""
response = False
if not self.meta and not self.slug:
print ("No metadata for creation found. You have to set the metadata first!")
else:
if self.slug:
if not self.slug in self.get_workspaces_list():
data = {"name": str(self.slug)}
response = self._post("workspace", "new", data, True)
else:
print (self.slug, "already an existing workspace, will not add.")
else:
print ("Did not find 'slug' in metadata. Needed to create a workspace.")
if response:
if response.ok:
if "workspace" in response.json():
updatedic = {}
if type(response.json()["workspace"]) is not dict:
return response
for key in response.json()["workspace"]:
if key.find("At")<0 and key in self.meta:
updatedic.setdefault(key, self.meta[key])
self._set_metadata_on_server(updatedic)
else:
return response
[docs]
def delete_from_server(self):
"""Remove a workspace from the server"""
if self.slug:
self._delete("workspace/"+self.slug)
if self.response.ok:
print ("Deleted", self.slug)
self.slug = ""
else:
print ("Error deleting "+self.slug+". Error", self.response.status_code)
[docs]
def set_workspacename(self, slug):
"""Set the name of the workspace and check existance of configuration"""
self.slug = slug
[docs]
def chat(self, prompt, provide_metadata = False):
"""Use the workspace for chatting"""
message = {"message": prompt,
"mode": "chat"
}
logging.info("Querying "+self.slug+": "+prompt)
response = self._post("workspace/"+self.slug, "chat", message, "application/json", "json")
if response.ok:
if "type" in response.json():
if provide_metadata:
return response.json()
if response.json()["type"] == "textResponse":
return response.json()["textResponse"]
else:
logging.warning("Did not get response of type 'textResponse', but", response.json()["type"],". Aborted.")
return False
else:
logging.warning("Did not find required 'type' in response. Aborted.")
return False
else:
logging.warning("Response not OK: Got error status", response.status_code)
return False
[docs]
def move_documents(self, docids_add = [], docids_remove = []):
"""Add documents to the workspace"""
docs_add = self.config.store.get_data_from_table("vectordocsbase", {"docid": docids_add}, True)
docs_remove = self.config.store.get_data_from_table("vectordocsbase", {"docid": docids_remove}, True)
logging.info("Moving "+str(len(docs_add))+" to workspace and removing "+str(len(docs_remove)))
update = {"adds": [], "deletes": []}
for entry in docs_add:
update["adds"].append(entry["folder"]+"/"+entry["name"])
for entry in docs_remove:
update["deletes"].append(entry["folder"]+"/"+entry["name"])
logging.debug("Updates list: "+str(update))
response = self._post("workspace/"+self.slug,"update-embeddings", update, passdata = "json")