Source code for efmc.llmtools.llm_utils

"""LLM utilities for online inference using various LLM providers."""

import concurrent.futures
import os
import time
from typing import Tuple

import tiktoken
from openai import OpenAI

from efmc.llmtools.logger import Logger


[docs]class LLM: """ An online inference model using different LLMs: - DeepSeek: V3, R1 (uses OpenAI-compatible API) - GLM: Zhipu AI models """ def __init__( self, online_model_name: str, logger: Logger, temperature: float = 0.0, system_role: str = ( "You are a experienced programmer and good at understanding " "programs written in mainstream programming languages." ), ) -> None: """ Initialize LLM instance. Args: online_model_name: Name of the online model logger: Logger instance temperature: Temperature for inference system_role: System role message """ self.online_model_name = online_model_name self.encoding = tiktoken.encoding_for_model( "gpt-3.5-turbo-0125" ) # We only use gpt-3.5 to measure token cost self.temperature = temperature self.system_role = system_role self.logger = logger
[docs] def infer( self, message: str, is_measure_cost: bool = False ) -> Tuple[str, int, int]: """ Infer using the online model. Args: message: Input message is_measure_cost: Whether to measure token cost Returns: Tuple of (output, input_token_cost, output_token_cost) """ self.logger.print_log(self.online_model_name, "is running") output = "" if "deepseek" in self.online_model_name: output = self.infer_with_deepseek_model(message) elif "glm" in self.online_model_name: output = self.infer_with_glm_model(message) else: raise ValueError( "Unsupported model name. Only DeepSeek and GLM models are supported." ) input_token_cost = ( 0 if not is_measure_cost else len(self.encoding.encode(self.system_role)) + len(self.encoding.encode(message)) ) output_token_cost = ( 0 if not is_measure_cost else len(self.encoding.encode(output)) ) return output, input_token_cost, output_token_cost
[docs] def run_with_timeout(self, func, timeout_seconds): """ Run a function with timeout that works in multiple threads. Args: func: Function to execute timeout_seconds: Timeout in seconds Returns: Function result or empty string on timeout/error """ with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(func) try: return future.result(timeout=timeout_seconds) except concurrent.futures.TimeoutError: self.logger.print_log("Operation timed out") return "" except (ValueError, RuntimeError, ConnectionError) as e: self.logger.print_log(f"Operation failed: {e}") return ""
[docs] def infer_with_deepseek_model(self, message): """ Infer using the DeepSeek model (V3, R1, etc.) DeepSeek uses OpenAI-compatible API format Args: message: Input message Returns: Model output or empty string on failure """ api_key = os.environ.get("DEEPSEEK_API_KEY") if not api_key: self.logger.print_log("DeepSeek API key not found in environment variables") return "" model_input = [ { "role": "system", "content": self.system_role, }, {"role": "user", "content": message}, ] def call_api(): client = OpenAI(api_key=api_key, base_url="https://api.deepseek.com/v1") response = client.chat.completions.create( model=self.online_model_name, messages=model_input, temperature=self.temperature, ) return response.choices[0].message.content try_count = 0 while try_count < 5: try_count += 1 try: output = self.run_with_timeout(call_api, timeout_seconds=300) if output: return output except (ValueError, RuntimeError, ConnectionError) as e: self.logger.print_log(f"DeepSeek API error: {e}") time.sleep(2) return ""
[docs] def infer_with_glm_model(self, message): """ Infer using the GLM model. Args: message: Input message Returns: Model output or empty string on failure """ try: from zhipuai import ZhipuAI # pylint: disable=import-outside-toplevel except ImportError: self.logger.print_log( "zhipuai package not installed. Please install it to use GLM models." ) return "" api_key = os.environ.get("GLM_API_KEY") if not api_key: self.logger.print_log("GLM API key not found in environment variables") return "" model_input = [ {"role": "system", "content": self.system_role}, {"role": "user", "content": message}, ] def call_api(): client = ZhipuAI(api_key=api_key) response = client.chat.completions.create( model=self.online_model_name, messages=model_input, temperature=self.temperature, ) return response.choices[0].message.content try_count = 0 while try_count < 5: try_count += 1 try: output = self.run_with_timeout(call_api, timeout_seconds=100) if output: return output except (ValueError, RuntimeError, ConnectionError) as e: self.logger.print_log(f"API error: {e}") time.sleep(2) return ""