LLm2Node/main.py

216 lines
7.5 KiB
Python

import gradio as gr
from faiss import write_index, read_index
from typing import List
#from langchain import PromptTemplate
from langchain.document_loaders import TextLoader
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores import FAISS
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import UnstructuredFileLoader
from langchain.document_loaders.recursive_url_loader import RecursiveUrlLoader
from langchain.document_loaders import UnstructuredURLLoader
from langchain.document_loaders.csv_loader import CSVLoader
#from langchain import LLMChain
from langchain.pydantic_v1 import BaseModel
from langchain.schema.embeddings import Embeddings
from langchain.document_loaders import DataFrameLoader
from langchain.embeddings import HuggingFaceEmbeddings
import pandas as pd
import sqlite3
from sentence_transformers import SentenceTransformer
from fastapi import FastAPI
from unidecode import unidecode
from nltk.corpus import stopwords
#from cleantext import clean
import re
class CustomEmbedding(Embeddings, BaseModel,):
"""embedding model with preprocessing"""
def _get_embedding(self,text) -> List[float]:
#print(text,"text")
text=remove_unwanted(text,punctuationOK=True,stopOK=True)
Sal=emb.encode(text)
return Sal
def embed_documents(self, texts: List[str]) -> List[List[float]]:
Sal=[]
for text in texts:
Sal.append(self._get_embedding(text))
return Sal
def embed_query(self, text: str) -> List[float]:
return self._get_embedding(text)
def remove_emoji(string):
emoji_pattern = re.compile("["
u"\U0001F600-\U0001F64F" # emoticons
u"\U0001F300-\U0001F5FF" # symbols & pictographs
u"\U0001F680-\U0001F6FF" # transport & map symbols
u"\U0001F1E0-\U0001F1FF" # flags (iOS)
u"\U00002702-\U000027B0"
u"\U000024C2-\U0001F251"
"]+", flags=re.UNICODE)
return emoji_pattern.sub(r' ', string)
def remove_unwanted(document,stopOK=False,punctuationOK=False,xtrasOK=False, emojiOk=False, unidecodeOK=False):
if punctuationOK:
# remove punctuation
for sig in [".",",","!","¿","?","=","(",")"]:
document=document.replace(sig," ")
if xtrasOK:
# remove user mentions
document = re.sub("@[A-Za-z0-9_]+"," ", document)
# remove URLS
document = re.sub(r'http\S+', ' ', document)
# remove hashtags
document = re.sub("#[A-Za-z0-9_]+","", document)
if emojiOk:
# remove emoji's
document = remove_emoji(document)
#document = re.sub("[^0-9A-Za-z ]", "" , document)
# remove double spaces
#print(document)
if unidecodeOK:
document=unidecode(document)
if stopOK:
words=document.split(" ")
stop_words = set(stopwords.words('spanish'))
words = [w for w in words if not w in stop_words]
document=" ".join(words)
document = document.replace(' ',"")
#print(document)
return document.strip().lower()
def loadmodelEmb(model_name = "embeddings/all-MiniLM-L6-v2",model_kwargs = {'device': 'cpu'}):
st = SentenceTransformer(model_name)
return st
emb=loadmodelEmb()
CUSTOM_PATH = "/angela"
app = FastAPI()
@app.get("/")
def read_main():
return {"message": "This is your main app"}
def loadCopysAndData(pathsqlite="motor.sqlite"):
con = sqlite3.connect(pathsqlite)
copies_df = pd.read_sql_query("SELECT * from copies", con)
copiesT = copies_df[copies_df.copy_start =="T"]
copiesT=copiesT[["copy_message","id","name","intencionality"]]
data = copiesT
print(data)
B=DataFrameLoader(data,page_content_column="copy_message")
B2=DataFrameLoader(data,page_content_column="intencionality")
documents=B.load()
documents2=B2.load()
return documents,documents2
def makeFaissdb(documents,folder_path,embedding):
try:
db=FAISS.load_local(folder_path=folder_path,embeddings=embedding)
except:
db = FAISS.from_documents(documents, embedding)
FAISS.save_local(db,folder_path=folder_path)
return db
#llm,emb=loadModels()
model="embeddings/all-mpnet-base-v2"
documents,documents2=loadCopysAndData()
emb=loadmodelEmb(model_name = model)
emb2=CustomEmbedding()
db=makeFaissdb(documents,"Copies3",emb2)
db2=makeFaissdb(documents2,"Intencionality3",emb2)
#db3=makeFaissdb(documents2,"nameshf",hf)
def FinderDbs(query,dbs,filtred=False,th=1.2):
AllData={}
for dbt in dbs:
Sal = dbt.similarity_search_with_score(query,4)
print(Sal)
for output in Sal:
if output[0].metadata["id"] in AllData.keys():
AllData[output[0].metadata["id"]]["d"]=min([AllData[output[0].metadata["id"]]["d"]-0.1,output[1]-0.1])
else:
AllData[output[0].metadata["id"]]={"d":output[1],"page_content":output[0].page_content}
#for item in AllData.items():
# print(item)
if filtred:
filtredData={}
for row in AllData.keys():
if AllData[row]["d"]<1.2:
filtredData[row]=AllData[row]
filtredData=dict(sorted(filtredData.items(), key=lambda item: item[1]["d"]))
return filtredData,filtredData.keys()
else:
AllData=dict(sorted(AllData.items(), key=lambda item: item[1]["d"]))
return AllData,AllData.keys()
def QARequest(Pregunta,filtred=False):
query = Pregunta
AllData=FinderDbs(query,[db2],filtred)
if AllData:
import markdown
AllData = list(AllData)
#lista = "<div style='border-style = solid;border-width:1px;border-radius:10px'>"
lista = ""
for k,i in enumerate(AllData[0].items()):
titulo = f"<div style='border-style = solid;border-width:1px;border-radius:10px;margin:14px;padding:14px'><h2>Respuesta {k+1}</h2>"
to_append = markdown.markdown(i[1]['page_content'])
to_append2 =markdown.markdown(str(i[1]['d']))
print(i)
lista = lista + titulo + to_append +to_append2+ '</div>'
#lista.append('<br>')
#lista = lista + '</div>'
AllData[0] = lista
return AllData
with gr.Blocks() as demo:
gr.Image("logo.jpg",height=100)
gr.Markdown("Esta es la busqueda que hace el usuario")
Pregunta = gr.Textbox(label="Pregunta")
#Pregunta = re.sub(r"(@\[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)|^rt|http.+?", "", Pregunta)
#Pregunta=Pregunta.strip().lower()
filtred=gr.Checkbox(label="filtrado")
gr.Markdown("Respuestas para orca desde los copys")
Respuesta = gr.Textbox(label="Respuesta")
id = gr.Textbox(label="id")
# metrica=gr.Textbox(label="metrica")
# gr.Markdown("Respuestas para orca desde los names")
# Respuesta2 = gr.Textbox(label="Respuesta2")
# id2 = gr.Textbox(label="id2")
# metrica2=gr.Textbox(label="metrica2")
# gr.Markdown("Respuestas para hf desde los names")
# Respuesta3 = gr.Textbox(label="Respuesta3")
# id3 = gr.Textbox(label="id3")
# metrica3=gr.Textbox(label="metrica3")
Enviar_btn = gr.Button("Responder")
Enviar_btn.click(fn=QARequest, inputs=[Pregunta,filtred], outputs=[gr.HTML(Respuesta),id], api_name="api_angela") #
#demo.launch(root_path="angela") #
gradio_app = gr.routes.App.create_app(demo)
app.mount(CUSTOM_PATH, gradio_app)
#app = demo.mount_gradio_app(app, io, path=CUSTOM_PATH)