151 lines
4.5 KiB
Python
151 lines
4.5 KiB
Python
from pydantic import BaseModel
|
|
from anthropic import Anthropic
|
|
import instructor
|
|
from deepeval.models import DeepEvalBaseLLM
|
|
from deepeval.metrics import AnswerRelevancyMetric
|
|
from deepeval.test_case import LLMTestCase
|
|
from deepeval.metrics import BiasMetric
|
|
from deepeval.metrics import ToxicityMetric
|
|
from deepeval.metrics import GEval
|
|
from deepeval.test_case import LLMTestCaseParams
|
|
from deepdiff import DeepDiff
|
|
import json
|
|
import os
|
|
pwd = os.getcwd()
|
|
def extractConfig(nameModel="SystemData",relPath=os.path.join(pwd,"conf/experiment_config.json"),dataOut="keyantrophics"):
|
|
configPath=os.path.join(os.getcwd(),relPath)
|
|
with open(configPath, 'r', encoding='utf-8') as file:
|
|
config = json.load(file)[nameModel]
|
|
Output= config[dataOut]
|
|
return Output
|
|
|
|
keyanthropic=extractConfig(nameModel="SystemData",dataOut="keyantrophics")
|
|
|
|
class CustomClaudeOpus(DeepEvalBaseLLM):
|
|
def __init__(self):
|
|
self.model = Anthropic(api_key=keyanthropic)
|
|
|
|
def load_model(self):
|
|
return self.model
|
|
|
|
def generate(self, prompt: str, schema: BaseModel) -> BaseModel:
|
|
client = self.load_model()
|
|
instructor_client = instructor.from_anthropic(client)
|
|
resp = instructor_client.messages.create(
|
|
model="claude-3-5-sonnet-20240620",
|
|
max_tokens=1024,
|
|
messages=[
|
|
{
|
|
"role": "user",
|
|
"content": prompt,
|
|
}
|
|
],
|
|
response_model=schema,
|
|
)
|
|
return resp
|
|
|
|
async def a_generate(self, prompt: str, schema: BaseModel) -> BaseModel:
|
|
return self.generate(prompt, schema)
|
|
|
|
def get_model_name(self):
|
|
return "Claude-3.5 sonnet"
|
|
customModel=CustomClaudeOpus()
|
|
|
|
def BiasMetric22(input,actual_output):
|
|
metric = BiasMetric(threshold=0.5,model=customModel)
|
|
|
|
test_case = LLMTestCase(
|
|
input=input,
|
|
actual_output=actual_output
|
|
)
|
|
metric.measure(test_case)
|
|
return {"score":metric.score,"reason":metric.reason}
|
|
|
|
def RelevanceMetric(input,actual_output):
|
|
# Replace this with the actual output from your LLM application
|
|
metric = AnswerRelevancyMetric(
|
|
threshold=0.7,
|
|
model=customModel,
|
|
include_reason=True
|
|
)
|
|
test_case = LLMTestCase(
|
|
input=input,
|
|
actual_output=actual_output
|
|
)
|
|
|
|
metric.measure(test_case)
|
|
return {"score":metric.score,"reason":metric.reason}
|
|
|
|
|
|
|
|
|
|
|
|
def ToxicMetric(input,actual_output):
|
|
metric = ToxicityMetric(threshold=0.5,model=customModel)
|
|
test_case = LLMTestCase(
|
|
input=input,
|
|
actual_output=actual_output
|
|
)
|
|
metric.measure(test_case)
|
|
print(metric.score,"toxic")
|
|
return {"score":metric.score,"reason":metric.reason}
|
|
|
|
|
|
|
|
def correctnessMetric(input,actual_output,expected_output,criteria="Determine that the output is a json whose keys contain with compra and the data correspond to the input",evaluation_steps=["Check whether the facts in 'actual output' contradicts any facts in 'expected output'","You should also heavily penalize omission of detail","Vague language, or contradicting OPINIONS, are OK" ]):
|
|
correctness_metric = GEval(
|
|
name="Correctness",
|
|
model=customModel,
|
|
criteria=criteria,
|
|
# NOTE: you can only provide either criteria or evaluation_steps, and not both
|
|
#evaluation_steps=evaluation_steps,
|
|
evaluation_params=[LLMTestCaseParams.INPUT, LLMTestCaseParams.ACTUAL_OUTPUT]
|
|
)
|
|
test_case = LLMTestCase(
|
|
input=input,
|
|
actual_output=actual_output,
|
|
expected_output=expected_output
|
|
)
|
|
|
|
correctness_metric.measure(test_case)
|
|
return {"score":correctness_metric.score,"reason":correctness_metric.reason}
|
|
|
|
def jsonMetrics(text,Trusted):
|
|
false=False
|
|
print(type(text),type(Trusted))
|
|
try:
|
|
A=json.loads(text)
|
|
jsonOk=1
|
|
except:
|
|
jsonOk=0
|
|
print(jsonOk)
|
|
if jsonOk==1:
|
|
|
|
try:
|
|
Trus=json.loads(Trusted)
|
|
except:
|
|
Trus=Trusted
|
|
print(11111,3333,Trus)
|
|
# print(type(A),type(json.loads(Trus)))
|
|
# ddiff = DeepDiff(A, Trus)
|
|
# print(5555,ddiff)
|
|
# affectedkeys=ddiff.affected_root_keys/len(A.keys())
|
|
# keys=set(json.loads(Trusted).keys())
|
|
# jsonkeys=set(A.keys())
|
|
# TotKey=len(keys.intersection(jsonkeys))/len(keys)
|
|
# keyplus=jsonkeys.intersection(keys)
|
|
# else:
|
|
# TotKey=0
|
|
# keyplus=0
|
|
# affectedkeys=0
|
|
|
|
return {"jsonOk":jsonOk}#,"TotKey":TotKey,"keyplus":keyplus,"affectedkeys":affectedkeys}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|