Source code for llmtuner.interface

import logging
import requests
from requests.auth import HTTPBasicAuth

from llmtuner.config import LLMTunerConfig

logging.basicConfig(level=logging.INFO)

[docs] class Interface: """Base class to handle an interfaces to servers or webpages.""" def __init__(self, interfaceid = "", config = ""): """Initialize Chatter passing LLMTunerConfig, filepath of the configuration or no configuration at all."""
[docs] self.interfaceid = interfaceid
if config == "": self.config = LLMTunerConfig() elif type(config) == str: self.config = LLMTunerConfig(config) elif type(config) == LLMTunerConfig: self.config = config else: logging.warning("Did not know how to handle configuration that you passed to class. "+ "Please provide filepath to configuration or a LLMTunerConfig object.")
[docs] self.baseurl = "" # URL to API endpoint or main webpage
[docs] self.authkey = "" # authentication key for API key access
[docs] self.basicauth = None # for username/password authentication
[docs] self.headers = {}
self._set_config()
[docs] def _set_config(self, configdict = {}): """Set configuration of interface if in cofiguration""" if not self.interfaceid and not configdict: print ("Need either interfaceid or configuration passed to work. Interface cannot be used like this.") else: params = {} if configdict: params = configdict else: if self.interfaceid and hasattr(self.config.params.servers, self.interfaceid): params = getattr(self.config.params.servers, self.interfaceid).__dict__ if params: if "api_key" in params: self.authkey = params["api_key"] self.headers = { 'Authorization': f'Bearer {self.authkey}' } if "base_url" in params: self.baseurl = params["base_url"] if "password" in params and "username" in params: self.basicauth = (params["username"], params["password"]) else: print ("Did not find configuration for interface!")
[docs] def update_config(self, valuedict): """Updates interface configuration from dictionary. Should contain parameters for configuration: {'base_url': 'https://...', 'api_key': 'your_key_ ...', 'interfaceid': 'name_of_interface' ...} """ if not 'interfaceid' in valuedict and not self.interfaceid: print ("You need to provide an interfaceid to set the interface configuration!") else: if 'interfaceid' in valuedict: self.interfaceid = valuedict["interfaceid"] self._set_config(valuedict) if 'interfaceid' in valuedict: del valuedict['interfaceid'] self.config._update_parameters(self.interfaceid, valuedict)
[docs] def _make_header(self, add_dict = {}, delete_list = []): """Modify standard header by deleting entries (delete_list) or adding entries (add_dict)""" header = {} for key in self.headers: if not key in delete_list: header.setdefault(key, self.headers[key]) for key in add_dict: header.setdefault(key, add_dict[key]) return header
[docs] def _get(self, endpoint, query = "", params = {}, set_query_from_params = False): """Basic get from endpoint. If 'set_query_from_params', the query will be produced with key-value pairs as direct part of the URL.""" header = self._make_header({'Content-Type': 'application/json', "Accept": "application/json"}) if endpoint: if endpoint[-1] != "/" and query: endpoint = endpoint + "/" baseurl = "" if endpoint.find("http")<0: baseurl = self.baseurl if set_query_from_params: query = "?" for key in params: query += key+"="+str(params[key]) + "&" query.rstrip("&") endpoint.rstrip("/") if self.basicauth and not params: self.response = requests.get(baseurl+endpoint+query, headers=header, auth = self.basicauth) elif params and set_query_from_params: self.response = requests.get(baseurl+endpoint+query, auth = self.basicauth) elif params: self.response = requests.get(baseurl+endpoint+query, json = params, auth = self.basicauth, verify=False) else: self.response = requests.get(baseurl+endpoint+query, headers=header, verify=False) if self.response.ok: return self.response else: logging.warning("Request failed.") return False
[docs] def _post(self, endpoint, query, data, content_type = "", passdata = "files"): """Basic post to server for given endpoint, query string and accompanying data, if 'usedata' the data is passed to the 'data' argument.""" if endpoint[-1] == "/": endpoint = endpoint[0:len(endpoint)-1] if type(content_type) is bool: content_type = "application/json" if content_type: header = self._make_header({'Content-Type': content_type, "Accept": "application/json"}) else: header = self._make_header({"Accept": "application/json"}) maxlength = 300 if len(str(data))<300: maxlength = len(str(data)) logging.debug("POSTING to "+ self.baseurl+endpoint+"/"+query) logging.debug("-- header: "+ str(header)) logging.debug("-- data: "+ str(data)[0:maxlength]) logging.debug("-- passdata: "+ str(passdata)) if passdata == "data": self.response = requests.post(self.baseurl+endpoint+"/"+query, data = data, headers=header) elif passdata == "json": self.response = requests.post(self.baseurl+endpoint+"/"+query, json = data, headers=header) elif passdata == "files": self.response = requests.post(self.baseurl+endpoint+"/"+query, files = data, headers=header) else: logging.warning("Did not find option for 'passdata': "+passdata+". Could not post request.") if self.response: if not self.response.ok: logging.warning("Got faulty result, status:", self.response.status_code) else: logging.warning("Request failed, did not get response") return self.response
[docs] def _delete(self, endpoint, data = {}): """Basic delete command to API""" headeradd = {"Accept": "*/*"} if data: headeradd = {'Content-Type':'application/json', 'Accept': 'application/json'} else: headeradd = {"Accept": "*/*"} header = self._make_header(headeradd) if data: self.response = requests.delete(self.baseurl+endpoint, headers=header, json = data) print ("deleting", self.baseurl+endpoint, "with", data) else: self.response = requests.delete(self.baseurl+endpoint, headers=header)