import gradio as gr from faiss import write_index, read_index from typing import List #from langchain import PromptTemplate from langchain.document_loaders import TextLoader from langchain.text_splitter import CharacterTextSplitter from langchain.vectorstores import FAISS from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.document_loaders import UnstructuredFileLoader from langchain.document_loaders.recursive_url_loader import RecursiveUrlLoader from langchain.document_loaders import UnstructuredURLLoader from langchain.document_loaders.csv_loader import CSVLoader #from langchain import LLMChain from langchain.pydantic_v1 import BaseModel from langchain.schema.embeddings import Embeddings from langchain.document_loaders import DataFrameLoader from langchain.embeddings import HuggingFaceEmbeddings import pandas as pd import sqlite3 from sentence_transformers import SentenceTransformer from fastapi import FastAPI from unidecode import unidecode from nltk.corpus import stopwords #from cleantext import clean import re model="embeddings/all-mpnet-base-v2" entrenamiento="V0.0" class CustomEmbedding(Embeddings, BaseModel,): """embedding model with preprocessing""" def _get_embedding(self,text) -> List[float]: #print(text,"text") text=remove_unwanted(text,punctuationOK=True,stopOK=True) Sal=emb.encode(text) return Sal def embed_documents(self, texts: List[str]) -> List[List[float]]: Sal=[] for text in texts: Sal.append(self._get_embedding(text)) return Sal def embed_query(self, text: str) -> List[float]: return self._get_embedding(text) def remove_emoji(string): emoji_pattern = re.compile("[" u"\U0001F600-\U0001F64F" # emoticons u"\U0001F300-\U0001F5FF" # symbols & pictographs u"\U0001F680-\U0001F6FF" # transport & map symbols u"\U0001F1E0-\U0001F1FF" # flags (iOS) u"\U00002702-\U000027B0" u"\U000024C2-\U0001F251" "]+", flags=re.UNICODE) return emoji_pattern.sub(r' ', string) def remove_unwanted(document,stopOK=False,punctuationOK=False,xtrasOK=False, emojiOk=False, unidecodeOK=False): if punctuationOK: # remove punctuation for sig in [".",",","!","¿","?","=","(",")"]: document=document.replace(sig," ") if xtrasOK: # remove user mentions document = re.sub("@[A-Za-z0-9_]+"," ", document) # remove URLS document = re.sub(r'http\S+', ' ', document) # remove hashtags document = re.sub("#[A-Za-z0-9_]+","", document) if emojiOk: # remove emoji's document = remove_emoji(document) #document = re.sub("[^0-9A-Za-z ]", "" , document) # remove double spaces #print(document) if unidecodeOK: document=unidecode(document) if stopOK: words=document.split(" ") stop_words = set(stopwords.words('spanish')) words = [w for w in words if not w in stop_words] document=" ".join(words) document = document.replace(' ',"") #print(document) return document.strip().lower() def loadmodelEmb(model_name = "embeddings/all-MiniLM-L6-v2",model_kwargs = {'device': 'cpu'}): st = SentenceTransformer(model_name) return st #emb=loadmodelEmb() CUSTOM_PATH = "/angela" app = FastAPI() @app.get("/") def read_main(): return {"message": "This is your main app"} def loadCopysAndData(pathsqlite="motor.sqlite"): con = sqlite3.connect(pathsqlite) copies_df = pd.read_sql_query("SELECT * from copies", con) copiesT = copies_df[copies_df.copy_start =="T"] copiesT=copiesT[["copy_message","id","name","intencionality"]] data = copiesT #print(data) B=DataFrameLoader(data,page_content_column="copy_message") B2=DataFrameLoader(data,page_content_column="intencionality") documents=B.load() documents2=B2.load() return documents,documents2 def makeFaissdb(documents,folder_path,embedding): try: db=FAISS.load_local(folder_path=folder_path,embeddings=embedding) except: db = FAISS.from_documents(documents, embedding) FAISS.save_local(db,folder_path=folder_path) return db #llm,emb=loadModels() documents,documents2=loadCopysAndData() emb=loadmodelEmb(model_name = model) emb2=CustomEmbedding() db=makeFaissdb(documents,"Copies3",emb2) db2=makeFaissdb(documents2,"Intencionality3",emb2) #db3=makeFaissdb(documents2,"nameshf",hf) def FinderDbs(query,dbs,filtred=False,th=1.2): AllData={} for dbt in dbs: Sal = dbt.similarity_search_with_score(query,4) print(Sal) for output in Sal: if output[0].metadata["id"] in AllData.keys(): AllData[output[0].metadata["id"]]["d"]=min([AllData[output[0].metadata["id"]]["d"]-0.1,output[1]-0.1]) else: AllData[output[0].metadata["id"]]={"d":output[1],"page_content":output[0].page_content} #for item in AllData.items(): # print(item) if filtred: filtredData={} for row in AllData.keys(): if AllData[row]["d"]<1.2: filtredData[row]=AllData[row] filtredData=dict(sorted(filtredData.items(), key=lambda item: item[1]["d"])) return filtredData,filtredData.keys() else: AllData=dict(sorted(AllData.items(), key=lambda item: item[1]["d"])) return AllData,AllData.keys() def QARequest(Pregunta,filtred=False): query = Pregunta AllData=FinderDbs(query,[db2],filtred) versionL="_".join([model,entrenamiento]) if AllData: import markdown AllData = list(AllData) #lista = "
" lista = "" dis=[] id=[] for k,i in enumerate(AllData[0].items()): titulo = f"

Respuesta {k+1}

" to_append = markdown.markdown(i[1]['page_content']) dis.append(str(i[1]['d'])) id.append(i[0]) #print("NNNN",i,k) lista = lista + titulo + to_append + '
' #lista.append('
') #lista = lista + '
' AllData[0] = lista return id, dis,versionL with gr.Blocks() as demo: gr.Image("logo.jpg",height=100) gr.Markdown("Esta es la busqueda que hace el usuario") Pregunta = gr.Textbox(label="Pregunta") #Pregunta = re.sub(r"(@\[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)|^rt|http.+?", "", Pregunta) #Pregunta=Pregunta.strip().lower() filtred=gr.Checkbox(label="filtrado") gr.Markdown("Respuestas para orca desde los copys") Respuesta = gr.Textbox(label="Respuesta") id = gr.Textbox(label="id") metrica=gr.Textbox(label="metrica") version = gr.Textbox(label="version") # id2 = gr.Textbox(label="id2") # metrica2=gr.Textbox(label="metrica2") # gr.Markdown("Respuestas para hf desde los names") # Respuesta3 = gr.Textbox(label="Respuesta3") # id3 = gr.Textbox(label="id3") # metrica3=gr.Textbox(label="metrica3") Enviar_btn = gr.Button("Responder") Enviar_btn.click(fn=QARequest, inputs=[Pregunta,filtred], outputs=[id,metrica,version], api_name="api_angela") # #demo.launch(root_path="angela") # gradio_app = gr.routes.App.create_app(demo) app.mount(CUSTOM_PATH, gradio_app) #app = demo.mount_gradio_app(app, io, path=CUSTOM_PATH)