import fastapi
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse,JSONResponse
from pydantic import BaseModel
import time
from fastapi.staticfiles import StaticFiles
from fastapi import FastAPI, Query, File, UploadFile,HTTPException
#from fastapi.middleware.cors import CORSMiddleware
from starlette.middleware.cors import CORSMiddleware
import main
import os
from databases import db
import audioread
import pandas as pd
import statistics
import hashlib
from datetime import datetime
import json
import uuid
import shutil
pwd = os.getcwd()
pathAud="example/audio"
pathFact="example/factura"
pathText="example/texto"
def extractConfig(nameModel="SystemData",relPath=os.path.join(pwd,"conf/experiment_config.json"),dataOut="keyantrophics"):
configPath=os.path.join(os.getcwd(),relPath)
with open(configPath, 'r', encoding='utf-8') as file:
config = json.load(file)[nameModel]
Output= config[dataOut]
return Output
mode_list=extractConfig(nameModel="SystemData",dataOut="mode_list")
app = FastAPI()
#app.mount("/statics", StaticFiles(directory="statics"), name="statics")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class Response(BaseModel):
"""Structure of data to querry of make post from X or article blog
"""
path: str = Query("", description="Style and sentiments of text")
model : str = Query("whisper", description="Style and sentiments of text")
class Response4(BaseModel):
path: str = Query("", description="path file")
system: str = Query("", description="prompt system LLM model with ocr and image claude")
content: str = Query("%s", description="prompt content LLM model with ocr")
max_tokens: int = Query(1024, description="maxtoken LLM OCR model")
model: str = Query("Claude-sonnet", description="model")
class Response1(BaseModel):
path: str = Query("", description="path file")
task_prompt: str = Query("", description="task of model")
model: str = Query("", description="model")
TrustedOCR: str = Query("", description="truted OCR model")
option: str = Query("", description="OCR model option")
class Response2(BaseModel):
path: str = Query("", description="path file")
task_prompt: str = Query("", description="task of model")
system: str = Query("", description="prompt system LLM model with ocr and image claude")
content: str = Query("%s", description="prompt content LLM model with ocr")
max_tokens: int = Query(1024, description="maxtoken LLM OCR model")
model: str = Query("Claude-sonnet", description="model")
prompt: str = Query("", description="prompt in claude with image")
TrustedLLmjson: str = Query("", description="truted OCR model")
class Response3(BaseModel):
"""Structure of data to querry of make post from X or article blog
"""
path: str = Query("", description="Style and sentiments of text")
Trusted: str = Query("", description="Style and sentiments of text")
mode : str = Query("", description="Style and sentiments of text")
class Response5(BaseModel):
"""Structure of data to querry of make post from X or article blog
"""
prompt: str = Query("", description="Style and sentiments of text")
mode : str = Query("", description="Style and sentiments of text")
#Funcionales
@app.post("/uploadimg")
def upload_image(image: UploadFile = File(...),type="factura"):
endfile=".jpg"
t=time.time()
print(image.headers)
try:
# Create a temporary file to store the uploaded audio
headfilename=uuid.uuid4()
filename="example/%s/"%(type)+str(headfilename)+endfile
with open(f"{filename}", "wb") as buffer:
shutil.copyfileobj(image.file, buffer)
return JSONResponse(content={
"message": filename,
"time":time.time()-t
}, status_code=200)
except Exception as e:
return JSONResponse(content={
"message": f"There was an error uploading the file: {str(e)}"
}, status_code=500)
@app.post("/uploadaud")
def upload_audio(audio: UploadFile = File(...)):
if audio.headers['content-type']=="audio/wav":
endfile=".wav"
if audio.headers['content-type']=="audio/ogg":
endfile=".ogg"
if audio.headers['content-type']=="video/webm":
endfile=".webm"
type="audio"
try:
# Create a temporary file to store the uploaded audio
headfilename=uuid.uuid4()
filename="example/%s/"%(type)+str(headfilename)+endfile
with open(f"{filename}", "wb") as buffer:
shutil.copyfileobj(audio.file, buffer)
# Here you can process the audio file as needed
# For example, you might want to move it to a permanent location
# or perform some operations on it
#payload={"password":GetText2Voice.extractConfig(nameModel="SystemData",relPath=os.path.join(pwd,"conf/experiment_config.json"),dataOut="password"),"local":filename}
#A=requests.get("http://127.0.0.1:7870/voice2txtlocal", json=payload)
t=time.time()
return JSONResponse(content={
"message": filename,
"time":time.time()-t
}, status_code=200)
except Exception as e:
return JSONResponse(content={
"message": f"There was an error uploading the file: {str(e)}"
}, status_code=500)
finally:
audio.file.close()
@app.get("/addaudiohtml")
def addaudiohtml():
html="""
"""
return HTMLResponse(content=html, status_code=200)
@app.get("/addTrusted")
@app.post("/addTrusted")
def addTrusted(response:Response3):
"""Api to add information of Trusted data
Args:
response (Response3): 3 params:
path : path of archive on system if is a file OR text if is text.
Trusted : information Trusted or better information in a process.
mode: llm_compra,llm_factura,llm_generaciontexto,llm_rag,ocr,voice,
Returns:
_type_: _description_
"""
path=response.path
Trusted=response.Trusted
mode=response.mode
last_modified=datetime.now()
if mode not in mode_list.keys():
return JSONResponse(
status_code=404,
content={"content": "mode no found" }
)
if mode == "llm_factura" or mode == "ocr" or mode == "voice":
if not os.path.isfile(path):
return JSONResponse(
status_code=404,
content={"content": "file no found" }
)
if mode_list[mode]=="texto":
info=str({"path":path,"trusted":Trusted,"mode":mode})
hash1 = hashlib.sha256(info.encode()).hexdigest()
# with open("example/texto/"+hash1, 'w') as f:
# json.dump(info, f)
# path=pwd+"/"+pathText+hash1
length=len(Trusted)
size=0
duration=0
elif mode_list[mode]=="factura":
file_stats = os.stat(path)
size=file_stats.st_size / (1024 * 1024)
length=0
duration=0
hash1=""
elif mode_list[mode]=="audio":
with audioread.audio_open(path) as f:
duration = f.duration
length=0
size=0
hash1=""
if db((db.trusted.path == path)&(db.trusted.mode == mode)).count()==0:
db.trusted.insert(path=path,trusted=Trusted,mode=mode,size=size,duration=duration,last_modified=last_modified,length=length,hash=hash1 )
db.commit()
return "Add %s in mode %s"%(path,mode)
else:
item=db((db.trusted.path == path)&(db.trusted.mode == mode)).select().last()
modification_count=item.modification_count + 1
db((db.trusted.path == path)&(db.trusted.mode == mode)).update(trusted=Trusted,size=size,duration =duration,length=length,last_modified=last_modified,modification_count= modification_count,hash=hash1)
db.commit()
return "Update %s in mode %s"%(path,mode)
@app.get("/addPrompt")
@app.post("/addPrompt")
def addPrompt(response:Response5):
"""Api to add information of Trusted data
Args:
response (Response3): 3 params:
path : path of archive on system if is a file OR text if is text.
Trusted : information Trusted or better information in a process.
mode: llm_compra,llm_factura,llm_generaciontexto,llm_rag,ocr,voice,
Returns:
_type_: _description_
"""
prompt=response.prompt
mode=response.mode
last_modified=datetime.now()
if mode not in mode_list.keys():
return JSONResponse(
status_code=404,
content={"content": "mode no found" }
)
if mode == "llm_compra" or mode == "llm_generaciontexto":
hash1 = str(hashlib.sha256(prompt.encode()).hexdigest())
# with open("example/texto/"+hash1, 'w') as f:
# json.dump(info, f)
# path=pwd+"/"+pathText+hash1
length=len(prompt)
if db((db.prompt.hash == hash1)&(db.prompt.mode == mode)).count()==0:
db.prompt.insert(prompt=prompt,mode=mode,last_modified=last_modified,length=length,hash=hash1 )
db.commit()
return "Add %s in mode %s"%(prompt,mode)
else:
A=db((db.prompt.hash == hash1)&(db.prompt.mode == mode)).update(prompt=prompt,mode=mode,last_modified=last_modified,length=length+1,hash=hash1)
db.commit()
print(A,last_modified)
return "Update %s in mode %s"%(prompt,mode)
@app.get("/EvalVoice")
@app.post("/EvalVoice")
def EvalVoice(response:Response):
path=response.path
model=response.model
if db((db.trusted.path == path ) & ( db.trusted.mode == "voice")).count()==0:
return JSONResponse(
status_code=404,
content={"content": "Trusted no found" }
)
Trusted=db((db.trusted.path == path ) & ( db.trusted.mode == "voice")).select().last().trusted
if model=="whisper":
Sal=main.EvalWhisper(path,Trusted)
else:
Sal=main.EvalVosk(path,Trusted)
Sal["last_modified"]=datetime.now()
if db((db.analitic_voice.path == Sal["path"]) & (db.analitic_voice.model == Sal["model"])).count()==0:
print(1,Sal)
db.analitic_voice.insert(**Sal)
db.commit()
else:
print(2,Sal)
db((db.analitic_voice.path == Sal["path"]) & (db.analitic_voice.model == Sal["model"])).update(similarity= Sal["similarity"],similaritypartial= Sal["similaritypartial"],last_modified=Sal["last_modified"])
db.commit()
return Sal
@app.get("/evalvoicehtml")
def EvalVoicehtml():
dir_list = os.listdir(pathAud)
Sal=""
t=1
for i in dir_list:
temp="""
"""%(str(pwd+"/"+pathAud+"/"+i),str(t),str(i))
Sal=Sal+temp
t=t+1
html="""
Evaluacion de modelos voice2txt
Petición Evaluar modelo de voz contra datos curados
"""%(Sal)
return HTMLResponse(content=html, status_code=200)
@app.get("/EvalLLMCompra")
@app.post("/EvalLLMCompra")
def EvalLLMCompra(response:Response4):
content=response.path
model=response.model
system= response.system
max_tokens= response.max_tokens
path=content
if db((db.trusted.path == path ) & ( db.trusted.mode == "llm_compra")).count()==0:
return JSONResponse(
status_code=404,
content={"content": "Trusted no found" }
)
Trusted=db((db.trusted.path == path ) & ( db.trusted.mode == "llm_compra")).select().last().trusted
Sal=main.EvalModelLLMCompra(system,content,model,max_tokens,Trusted)
Sal["last_modified"]=datetime.now()
if db((db.analitic_llm_compra.path == Sal["path"]) & (db.analitic_llm_compra.model == Sal["model"])).count()==0:
print(1,Sal)
db.analitic_llm_compra.insert(**Sal)
db.commit()
else:
print(2,Sal)
db((db.analitic_llm_compra.path == Sal["path"]) & (db.analitic_llm_compra.model == Sal["model"])).update(last_modified=Sal["last_modified"],relevance=Sal["relevance"],bias=Sal["bias"],toxic=Sal["toxic"],correctness=Sal["correctness"],relevance_r=Sal["relevance_r"],bias_r=Sal["bias_r"],toxic_r=Sal["toxic_r"],correctness_r=Sal["correctness_r"])
db.commit()
return Sal
@app.get("/evalllmcomprahtml")
def EvalLLMComprahtml():
dir_list = db((db.trusted.mode == "llm_compra" )).select()
Sal=""
t=1
for i in dir_list:
temp="""
"""%(i.path,str(t),str(i.path))
Sal=Sal+temp
t=t+1
dir_list2 = db((db.prompt.mode == "llm_compra" )).select()
Sal2=""
t=1
for i in dir_list2:
temp="""
"""%(i.prompt,str(t),str(i.prompt))
Sal2=Sal2+temp
t=t+1
html="""
Evaluacion de modelos voice2txt
Petición Evaluar modelo de LLM para evaluar compras contra datos curados
"""%(Sal,Sal2)
return HTMLResponse(content=html, status_code=200)
#
@app.get("/EvalLLMGeneracionTexto")
@app.post("/EvalLLMGeneracionTexto")
def EvalLLMGeneracionTexto(response:Response4):
content=response.path
model=response.model
system= response.system
max_tokens= response.max_tokens
path=content
if db((db.trusted.path == path ) & ( db.trusted.mode == "llm_generaciontexto")).count()==0:
return JSONResponse(
status_code=404,
content={"content": "Trusted no found" }
)
Trusted=db((db.trusted.path == path ) & ( db.trusted.mode == "llm_generaciontexto")).select().last().trusted
Sal=main.EvalModelLLMCompra(system,content,model,max_tokens,Trusted)
Sal["last_modified"]=datetime.now()
if db((db.analitic_llm_generaciontexto.path == Sal["path"]) & (db.analitic_llm_generaciontexto.model == Sal["model"])).count()==0:
print(1,Sal)
db.analitic_llm_generaciontexto.insert(**Sal)
db.commit()
else:
print(2,Sal)
db((db.analitic_llm_generaciontexto.path == Sal["path"]) & (db.analitic_llm_generaciontexto.model == Sal["model"])).update(last_modified=Sal["last_modified"],relevance=Sal["relevance"],bias=Sal["bias"],toxic=Sal["toxic"],correctness=Sal["correctness"],relevance_r=Sal["relevance_r"],bias_r=Sal["bias_r"],toxic_r=Sal["toxic_r"],correctness_r=Sal["correctness_r"])
db.commit()
return Sal
@app.get("/evalllmgeneraciontextohtml")
def EvalLLMGeneracionTextohtml():
dir_list = db((db.trusted.mode == "llm_generaciontexto" )).select()
Sal=""
t=1
for i in dir_list:
temp="""
"""%(i.path,str(t),str(i.path))
Sal=Sal+temp
t=t+1
dir_list2 = db((db.prompt.mode == "llm_generaciontexto" )).select()
Sal2=""
t=1
for i in dir_list2:
temp="""
"""%(i.prompt,str(t),str(i.prompt))
Sal2=Sal2+temp
t=t+1
html="""
Evaluacion de modelos voice2txt
Petición Evaluar modelo de LLM para generar texto contra datos curados
"""%(Sal,Sal2)
return HTMLResponse(content=html, status_code=200)
#
@app.get("/EvalFact")
@app.post("/EvalFact")
def EvalFact(response:Response1):
path=response.path
task_prompt=response.task_prompt
option=response.model
TrustedOCR=response.TrustedOCR
Trusted=TrustedOCR
if task_prompt=="":
if Trusted=="":
row=db(db.trusted.path == path and db.trusted.mode == "OCR").select().first()
try:
Trusted=row.trusted
except:
pass
Sal=main.EvalFacturas(path,task_prompt,TrustedOCR,option)
Sal["path"]=path
if db(db.analitic_ocr.path == Sal["path"] and db.analitic_ocr.model == Sal["model"]).count()==0:
db.analitic_ocr.insert(**Sal)
db.commit()
else:
db(db.analitic_ocr.path == Sal["path"] and db.analitic_ocr.model == Sal["model"]).update(similarity= Sal["similarity"],similaritypartial= Sal["similaritypartial"],jsonok=Sal["jsonok"])
db.commit()
return Sal
@app.get("/evalocrfactura")
def EvalOCRFactura():
dir_list = os.listdir(pathFact)
Sal=""
t=1
for i in dir_list:
temp="""
"""%(str(pwd+"/"+pathFact+"/"+i),str(t),str(i))
Sal=Sal+temp
t=t+1
html="""
Evaluacion de modelos OCR
Petición POST a API
"""%(Sal)
return HTMLResponse(content=html, status_code=200)
@app.get("/evalllmfacturas")
def EvalllmFacturas():
dir_list = os.listdir(pathFact)
Sal=""
t=1
for i in dir_list:
temp="""
"""%(str(pwd+"/"+pathFact+"/"+i),str(t),str(i))
Sal=Sal+temp
t=t+1
html="""
Evaluacion modelos LLM
"""
return html
def tableVoice(model):
rows = db(db.analitic_voice.model==model).select()
rows_list = rows.as_list()
data=pd.DataFrame(rows_list)
durationL=list()
for i in rows_list:
durationL.append(db(db.trusted.path == i["path"] ).select().last().duration)
duration=statistics.mean(durationL)
time=pd.pivot_table(data,values=['time','similarity', 'similaritypartial'],index="model")['time'].values[0]
similarity=pd.pivot_table(data,values=['time','similarity', 'similaritypartial'],index="model")['similarity'].values[0]
similaritypartial=pd.pivot_table(data,values=['time','similarity', 'similaritypartial'],index="model")['similaritypartial'].values[0]
efectivetime=time/duration
card="""
{0}
time of process (sg)
{1}
similarity
{2}
similaritypartial
{3}
time of audio(sg)
{4}
time in process
{5}
""".format(model,time,similarity,similaritypartial,duration,efectivetime)
return {"duration":duration,"time":time,"similarity":similarity,"similaritypartial":similaritypartial,"card":card,"data":list2tablehtml(rows_list,model)}
@app.get("/getmetricsvoice")
def getMetricsVoice():
pass
models=list()
for row in db().select(db.analitic_voice.model, distinct=True):
models.append(row.model)
cards=""
dataAll=""
for model in models:
Sal=tableVoice(model)
cards=cards+Sal["card"]
dataAll=dataAll+Sal["data"]
htmlhead="""
Evaluacion de modelos voice2txt
"""
htmlbody="""
""".format(model,time,similarity,similaritypartial)
return {"time":time,"similarity":similarity,"similaritypartial":similaritypartial,"card":card,"data":list2tablehtmlOCR(rows_list,model)}
@app.get("/getmetricsocr")
def getMetricsOCR():
models=list()
for row in db().select(db.analitic_ocr.model, distinct=True):
models.append(row.model)
cards=""
dataAll=""
for model in models:
Sal=tableOCR(model)
cards=cards+Sal["card"]
dataAll=dataAll+Sal["data"]
htmlhead="""
Evaluacion de modelos voice2txt
"""
htmlbody="""