import logging
import requests
from requests.auth import HTTPBasicAuth
from llmtuner.config import LLMTunerConfig
logging.basicConfig(level=logging.INFO)
[docs]
class Interface:
"""Base class to handle an interfaces to servers or webpages."""
def __init__(self, interfaceid = "", config = ""):
"""Initialize Chatter passing LLMTunerConfig, filepath of the configuration or no configuration at all."""
[docs]
self.interfaceid = interfaceid
if config == "":
self.config = LLMTunerConfig()
elif type(config) == str:
self.config = LLMTunerConfig(config)
elif type(config) == LLMTunerConfig:
self.config = config
else:
logging.warning("Did not know how to handle configuration that you passed to class. "+
"Please provide filepath to configuration or a LLMTunerConfig object.")
[docs]
self.baseurl = "" # URL to API endpoint or main webpage
[docs]
self.authkey = "" # authentication key for API key access
[docs]
self.basicauth = None # for username/password authentication
self._set_config()
[docs]
def _set_config(self, configdict = {}):
"""Set configuration of interface if in cofiguration"""
if not self.interfaceid and not configdict:
print ("Need either interfaceid or configuration passed to work. Interface cannot be used like this.")
else:
params = {}
if configdict:
params = configdict
else:
if self.interfaceid and hasattr(self.config.params.servers, self.interfaceid):
params = getattr(self.config.params.servers, self.interfaceid).__dict__
if params:
if "api_key" in params:
self.authkey = params["api_key"]
self.headers = {
'Authorization': f'Bearer {self.authkey}'
}
if "base_url" in params:
self.baseurl = params["base_url"]
if "password" in params and "username" in params:
self.basicauth = (params["username"], params["password"])
else:
print ("Did not find configuration for interface!")
[docs]
def update_config(self, valuedict):
"""Updates interface configuration from dictionary. Should contain parameters for configuration:
{'base_url': 'https://...', 'api_key': 'your_key_ ...', 'interfaceid': 'name_of_interface' ...}
"""
if not 'interfaceid' in valuedict and not self.interfaceid:
print ("You need to provide an interfaceid to set the interface configuration!")
else:
if 'interfaceid' in valuedict:
self.interfaceid = valuedict["interfaceid"]
self._set_config(valuedict)
if 'interfaceid' in valuedict:
del valuedict['interfaceid']
self.config._update_parameters(self.interfaceid, valuedict)
[docs]
def _get(self, endpoint, query = "", params = {}, set_query_from_params = False):
"""Basic get from endpoint. If 'set_query_from_params', the query will be produced with key-value pairs as direct part of the URL."""
header = self._make_header({'Content-Type': 'application/json', "Accept": "application/json"})
if endpoint:
if endpoint[-1] != "/" and query:
endpoint = endpoint + "/"
baseurl = ""
if endpoint.find("http")<0:
baseurl = self.baseurl
if set_query_from_params:
query = "?"
for key in params:
query += key+"="+str(params[key]) + "&"
query.rstrip("&")
endpoint.rstrip("/")
if self.basicauth and not params:
self.response = requests.get(baseurl+endpoint+query,
headers=header, auth = self.basicauth)
elif params and set_query_from_params:
self.response = requests.get(baseurl+endpoint+query,
auth = self.basicauth)
elif params:
self.response = requests.get(baseurl+endpoint+query,
json = params, auth = self.basicauth, verify=False)
else:
self.response = requests.get(baseurl+endpoint+query,
headers=header, verify=False)
if self.response.ok:
return self.response
else:
logging.warning("Request failed.")
return False
[docs]
def _post(self, endpoint, query, data, content_type = "", passdata = "files"):
"""Basic post to server for given endpoint, query string and accompanying data,
if 'usedata' the data is passed to the 'data' argument."""
if endpoint[-1] == "/":
endpoint = endpoint[0:len(endpoint)-1]
if type(content_type) is bool:
content_type = "application/json"
if content_type:
header = self._make_header({'Content-Type': content_type, "Accept": "application/json"})
else:
header = self._make_header({"Accept": "application/json"})
maxlength = 300
if len(str(data))<300:
maxlength = len(str(data))
logging.debug("POSTING to "+ self.baseurl+endpoint+"/"+query)
logging.debug("-- header: "+ str(header))
logging.debug("-- data: "+ str(data)[0:maxlength])
logging.debug("-- passdata: "+ str(passdata))
if passdata == "data":
self.response = requests.post(self.baseurl+endpoint+"/"+query, data = data, headers=header)
elif passdata == "json":
self.response = requests.post(self.baseurl+endpoint+"/"+query, json = data, headers=header)
elif passdata == "files":
self.response = requests.post(self.baseurl+endpoint+"/"+query, files = data, headers=header)
else:
logging.warning("Did not find option for 'passdata': "+passdata+". Could not post request.")
if self.response:
if not self.response.ok:
logging.warning("Got faulty result, status:", self.response.status_code)
else:
logging.warning("Request failed, did not get response")
return self.response
[docs]
def _delete(self, endpoint, data = {}):
"""Basic delete command to API"""
headeradd = {"Accept": "*/*"}
if data:
headeradd = {'Content-Type':'application/json', 'Accept': 'application/json'}
else:
headeradd = {"Accept": "*/*"}
header = self._make_header(headeradd)
if data:
self.response = requests.delete(self.baseurl+endpoint, headers=header, json = data)
print ("deleting", self.baseurl+endpoint, "with", data)
else:
self.response = requests.delete(self.baseurl+endpoint, headers=header)