Files
qiweimanager-master/helper/auto_reply_status.go

445 lines
17 KiB
Go

package main
import (
"strings"
"sync"
"time"
"qiweimanager/config"
)
const (
autoReplyRecordLimit = 100
autoReplyErrorScopeListen = "listen"
autoReplyErrorScopeAI = "ai"
autoReplyErrorScopeKnowledge = "knowledge"
autoReplyErrorScopeHandoff = "handoff"
autoReplyErrorScopeIdentity = "identity"
autoReplyErrorScopeRecords = "records"
)
type AutoReplyRecord struct {
ID int64 `json:"id"`
Time string `json:"time"`
RobotID string `json:"robotId"`
ClientID int32 `json:"clientId"`
UserID string `json:"userId"`
ConversationID string `json:"conversationId"`
Source string `json:"source"`
FromWxID string `json:"fromWxId"`
FromNickName string `json:"fromNickName"`
Question string `json:"question"`
Action string `json:"action"`
Reason string `json:"reason"`
Answer string `json:"answer"`
SenderIdentity string `json:"senderIdentity"`
IdentitySource string `json:"identitySource"`
CardStatus string `json:"cardStatus"`
Score float64 `json:"score"`
KeywordScore float64 `json:"keywordScore"`
VectorScore float64 `json:"vectorScore"`
RerankScore float64 `json:"rerankScore"`
RetrievalMode string `json:"retrievalMode"`
UsedKnowledgeSources string `json:"usedKnowledgeSources"`
KnowledgeDurationMS int64 `json:"knowledgeDurationMs"`
KeywordDurationMS int64 `json:"keywordDurationMs"`
VectorDurationMS int64 `json:"vectorDurationMs"`
RerankDurationMS int64 `json:"rerankDurationMs"`
AIDurationMS int64 `json:"aiDurationMs"`
TotalDurationMS int64 `json:"totalDurationMs"`
}
type AutoReplyStatus struct {
Enabled bool `json:"enabled"`
Running bool `json:"running"`
LastError string `json:"lastError"`
LastErrorScope string `json:"lastErrorScope"`
KnowledgeFileCount int `json:"knowledgeFileCount"`
KnowledgeChunkCount int `json:"knowledgeChunkCount"`
KnowledgeLastIndexedAt int64 `json:"knowledgeLastIndexedAt"`
KnowledgeFailedFiles []string `json:"knowledgeFailedFiles"`
RetrievalMode string `json:"retrievalMode"`
EmbeddingChunkCount int `json:"embeddingChunkCount"`
EmbeddingModel string `json:"embeddingModel"`
EmbeddingDimensions int `json:"embeddingDimensions"`
EmbeddingLastIndexedAt int64 `json:"embeddingLastIndexedAt"`
InternalContactCount int `json:"internalContactCount"`
ExternalContactCount int `json:"externalContactCount"`
IdentityLastRefreshAt int64 `json:"identityLastRefreshAt"`
IdentityRefreshError string `json:"identityRefreshError"`
IdentityRefreshing bool `json:"identityRefreshing"`
IdentityLastResponseType string `json:"identityLastResponseType"`
IdentityLastResponseCount int `json:"identityLastResponseCount"`
IdentityLastResponseAt int64 `json:"identityLastResponseAt"`
IdentityLookupInFlight int `json:"identityLookupInFlight"`
IdentityInitializing bool `json:"identityInitializing"`
IdentityInitializedAt int64 `json:"identityInitializedAt"`
IdentityScope string `json:"identityScope"`
IdentityGroupOptionCount int `json:"identityGroupOptionCount"`
InternalGroupMemberLastSyncAt int64 `json:"internalGroupMemberLastSyncAt"`
InternalGroupMemberLastSyncCount int `json:"internalGroupMemberLastSyncCount"`
InternalGroupMemberSyncError string `json:"internalGroupMemberSyncError"`
RobotUserIDs []string `json:"robotUserIds"`
TodayReceived int `json:"todayReceived"`
TodayReplied int `json:"todayReplied"`
TodayHandoff int `json:"todayHandoff"`
TodayIgnored int `json:"todayIgnored"`
TodayAIFailed int `json:"todayAIFailed"`
LastKnowledgeDurationMS int64 `json:"lastKnowledgeDurationMs"`
LastKeywordDurationMS int64 `json:"lastKeywordDurationMs"`
LastVectorDurationMS int64 `json:"lastVectorDurationMs"`
LastRerankDurationMS int64 `json:"lastRerankDurationMs"`
LastAIDurationMS int64 `json:"lastAiDurationMs"`
LastTotalDurationMS int64 `json:"lastTotalDurationMs"`
LastKeywordScore float64 `json:"lastKeywordScore"`
LastVectorScore float64 `json:"lastVectorScore"`
LastRerankScore float64 `json:"lastRerankScore"`
ReasonCounts map[string]int `json:"reasonCounts"`
LastMessages []AutoReplyRecord `json:"lastMessages"`
HumanAssistPendingCount int `json:"humanAssistPendingCount"`
HumanAssistObservedCount int `json:"humanAssistObservedCount"`
CollaborationWaitingCount int `json:"collaborationWaitingCount"`
CollaborationTakeoverCount int `json:"collaborationTakeoverCount"`
TodayCollaborationSupplemented int `json:"todayCollaborationSupplemented"`
TodayCollaborationTakeovers int `json:"todayCollaborationTakeovers"`
}
type AutoReplyEngine struct {
mu sync.Mutex
config config.AutoReplyConfig
queue chan AutoReplyJob
dedupe map[string]time.Time
cooldowns map[string]time.Time
groupNames map[string]string
accountNames map[int32][]string
identityCaches map[int32]*autoReplyIdentityCache
identityLookups map[string]time.Time
identityGroups map[int32]map[string]autoReplyGroupOption
identityWait bool
contextEntries map[string][]autoReplyContextEntry
humanPending map[string]*humanAssistPending
collaborations map[string]*collaborationSession
autoSent map[string]time.Time
records []AutoReplyRecord
nextRecordID int64
status AutoReplyStatus
startedAt time.Time
enabledAt time.Time
index *KnowledgeIndex
embeddingIndex *EmbeddingIndex
}
type AutoReplyJob struct {
ClientID int32
RawData map[string]interface{}
ReceivedAt time.Time
SkipHumanAssist bool
SkipCollaboration bool
ForceNoCooldown bool
SupplementReason string
}
type autoReplyTimings struct {
KnowledgeDurationMS int64
KeywordDurationMS int64
VectorDurationMS int64
RerankDurationMS int64
AIDurationMS int64
TotalDurationMS int64
KeywordScore float64
VectorScore float64
RerankScore float64
RetrievalMode string
UsedKnowledgeSources []string
}
var autoReplyEngine *AutoReplyEngine
func initAutoReplyEngine() {
cfg := config.NewDefaultAutoReplyConfig()
if appConfig := config.GetGlobalConfig(); appConfig != nil {
appConfig.ApplyDefaults()
cfg = appConfig.AutoReplyConfig
}
now := time.Now()
autoReplyEngine = &AutoReplyEngine{
config: cfg,
queue: make(chan AutoReplyJob, 200),
dedupe: make(map[string]time.Time),
cooldowns: make(map[string]time.Time),
groupNames: make(map[string]string),
accountNames: make(map[int32][]string),
identityCaches: make(map[int32]*autoReplyIdentityCache),
identityLookups: make(map[string]time.Time),
identityGroups: make(map[int32]map[string]autoReplyGroupOption),
contextEntries: make(map[string][]autoReplyContextEntry),
humanPending: make(map[string]*humanAssistPending),
collaborations: make(map[string]*collaborationSession),
autoSent: make(map[string]time.Time),
status: AutoReplyStatus{
Enabled: cfg.Enabled,
Running: cfg.Enabled,
RetrievalMode: cfg.Retrieval.RetrievalMode,
EmbeddingModel: cfg.Retrieval.EmbeddingModel,
EmbeddingDimensions: cfg.Retrieval.EmbeddingDimensions,
ReasonCounts: make(map[string]int),
},
startedAt: now,
enabledAt: autoReplyEnabledAt(cfg.Enabled, now),
index: NewKnowledgeIndex(),
embeddingIndex: NewEmbeddingIndex(cfg.Retrieval.EmbeddingModel, cfg.Retrieval.EmbeddingDimensions),
}
if err := autoReplyEngine.loadKnowledgeIndex(); err != nil {
autoReplyEngine.setLastErrorWithScope(autoReplyErrorScopeKnowledge, err.Error())
}
if err := autoReplyEngine.loadEmbeddingIndex(); err != nil {
autoReplyEngine.setLastErrorWithScope(autoReplyErrorScopeKnowledge, err.Error())
}
if err := autoReplyEngine.loadIdentityCache(); err != nil {
autoReplyEngine.setLastErrorWithScope(autoReplyErrorScopeIdentity, "身份缓存加载失败: "+err.Error())
}
if err := autoReplyEngine.loadContextCache(); err != nil {
autoReplyEngine.setLastErrorWithScope(autoReplyErrorScopeRecords, "conversation context load failed: "+err.Error())
}
if cfg.Knowledge.AutoRebuildOnStart {
go func() {
if _, err := autoReplyEngine.rebuildKnowledgeIndex(); err != nil {
autoReplyEngine.setLastErrorWithScope(autoReplyErrorScopeKnowledge, err.Error())
}
}()
}
if cfg.Identity.RefreshOnStart {
autoReplyEngine.refreshIdentityContactsAsync("startup")
}
go autoReplyEngine.identityRefreshLoop()
go autoReplyEngine.collaborationSweepLoop()
go autoReplyEngine.worker()
}
func getAutoReplyEngine() *AutoReplyEngine {
if autoReplyEngine == nil {
initAutoReplyEngine()
}
return autoReplyEngine
}
func (e *AutoReplyEngine) reloadConfig() {
appConfig, err := config.ReloadGlobalConfig()
if err != nil {
e.setLastErrorWithScope(autoReplyErrorScopeListen, err.Error())
return
}
if appConfig == nil {
return
}
appConfig.ApplyDefaults()
e.mu.Lock()
wasEnabled := e.config.Enabled
e.config = appConfig.AutoReplyConfig
e.status.Enabled = e.config.Enabled
e.status.Running = e.config.Enabled
if e.config.Enabled && !wasEnabled {
e.enabledAt = time.Now()
} else if !e.config.Enabled {
e.enabledAt = time.Time{}
} else if e.config.Enabled && e.enabledAt.IsZero() {
e.enabledAt = time.Now()
}
e.status.RetrievalMode = e.config.Retrieval.RetrievalMode
e.status.EmbeddingModel = e.config.Retrieval.EmbeddingModel
e.status.EmbeddingDimensions = e.config.Retrieval.EmbeddingDimensions
if hasManualIdentityFallback(e.config.Identity) && isIdentityEmptyCacheWarning(e.status.IdentityRefreshError) {
e.status.IdentityRefreshError = ""
}
e.mu.Unlock()
if err := e.loadKnowledgeIndex(); err != nil {
e.setLastErrorWithScope(autoReplyErrorScopeKnowledge, err.Error())
}
if err := e.loadEmbeddingIndex(); err != nil {
e.setLastErrorWithScope(autoReplyErrorScopeKnowledge, err.Error())
}
if err := e.loadIdentityCache(); err != nil {
e.setLastErrorWithScope(autoReplyErrorScopeIdentity, "身份缓存加载失败: "+err.Error())
}
if err := e.loadContextCache(); err != nil {
e.setLastErrorWithScope(autoReplyErrorScopeRecords, "conversation context load failed: "+err.Error())
}
if e.config.Identity.RefreshOnStart {
e.refreshIdentityContactsAsync("reload")
}
}
func autoReplyEnabledAt(enabled bool, fallback time.Time) time.Time {
if !enabled {
return time.Time{}
}
if fallback.IsZero() {
return time.Now()
}
return fallback
}
func (e *AutoReplyEngine) snapshotStatus() AutoReplyStatus {
e.mu.Lock()
defer e.mu.Unlock()
status := e.status
status.LastMessages = append([]AutoReplyRecord(nil), e.records...)
status.KnowledgeFailedFiles = append([]string(nil), e.status.KnowledgeFailedFiles...)
status.ReasonCounts = copyStringIntMap(e.status.ReasonCounts)
status.RobotUserIDs = knownRobotUserIDsSnapshot()
status.IdentityScope = e.currentIdentityScope()
status.HumanAssistPendingCount = len(e.humanPending)
waiting, takeover := e.collaborationCountsLocked()
status.CollaborationWaitingCount = waiting
status.CollaborationTakeoverCount = takeover
return status
}
func (e *AutoReplyEngine) setLastError(msg string) {
e.setLastErrorWithScope(inferAutoReplyErrorScope(msg), msg)
}
func (e *AutoReplyEngine) setLastErrorWithScope(scope string, msg string) {
scope = normalizeAutoReplyErrorScope(scope)
if scope == "" {
scope = inferAutoReplyErrorScope(msg)
}
e.mu.Lock()
e.status.LastError = msg
if strings.TrimSpace(msg) == "" {
e.status.LastErrorScope = ""
} else {
e.status.LastErrorScope = scope
}
e.mu.Unlock()
if msg != "" && globalLogger != nil {
globalLogger.Warn("[自动客服] %s", msg)
}
}
func normalizeAutoReplyErrorScope(scope string) string {
switch strings.TrimSpace(scope) {
case autoReplyErrorScopeListen:
return autoReplyErrorScopeListen
case autoReplyErrorScopeAI:
return autoReplyErrorScopeAI
case autoReplyErrorScopeKnowledge:
return autoReplyErrorScopeKnowledge
case autoReplyErrorScopeHandoff:
return autoReplyErrorScopeHandoff
case autoReplyErrorScopeIdentity:
return autoReplyErrorScopeIdentity
case autoReplyErrorScopeRecords:
return autoReplyErrorScopeRecords
default:
return ""
}
}
func inferAutoReplyErrorScope(msg string) string {
text := strings.TrimSpace(msg)
if text == "" {
return ""
}
lower := strings.ToLower(text)
switch {
case strings.Contains(text, "AI请求失败") || strings.Contains(text, "AI 请求失败") || strings.Contains(text, "AI 测试失败"):
return autoReplyErrorScopeAI
case strings.Contains(text, "联系人身份") || strings.Contains(text, "身份查询") ||
strings.Contains(text, "未知身份拦截回复失败") || strings.Contains(text, "内部员工拦截回复失败"):
return autoReplyErrorScopeIdentity
case strings.Contains(text, "转人工发送失败") || strings.Contains(text, "测试私信失败") ||
strings.Contains(text, "人工名片") || strings.Contains(text, "客户名片") || strings.Contains(text, "客户说明"):
return autoReplyErrorScopeHandoff
case strings.Contains(text, "知识库") || strings.Contains(text, "知识索引") ||
strings.Contains(text, "向量召回") || strings.Contains(text, "重排序") ||
strings.Contains(text, "重建失败") || strings.Contains(text, "索引") ||
strings.Contains(lower, "embedding") || strings.Contains(lower, "rerank"):
return autoReplyErrorScopeKnowledge
case strings.Contains(text, "开启失败") || strings.Contains(text, "关闭失败") ||
strings.Contains(text, "保存失败") || strings.Contains(text, "加载自动客服配置失败") ||
strings.Contains(text, "重载") || strings.Contains(text, "监听") || strings.Contains(text, "启动"):
return autoReplyErrorScopeListen
default:
return autoReplyErrorScopeRecords
}
}
func (e *AutoReplyEngine) addRecord(record AutoReplyRecord) {
e.mu.Lock()
defer e.mu.Unlock()
e.nextRecordID++
record.ID = e.nextRecordID
if record.Time == "" {
record.Time = time.Now().Format("2006-01-02 15:04:05")
}
e.records = append([]AutoReplyRecord{record}, e.records...)
if len(e.records) > autoReplyRecordLimit {
e.records = e.records[:autoReplyRecordLimit]
}
}
func (e *AutoReplyEngine) incStatus(field string) {
e.mu.Lock()
defer e.mu.Unlock()
switch field {
case "received":
e.status.TodayReceived++
case "replied":
e.status.TodayReplied++
case "handoff":
e.status.TodayHandoff++
case "ignored":
e.status.TodayIgnored++
case "ai_failed":
e.status.TodayAIFailed++
case "collaboration_supplemented":
e.status.TodayCollaborationSupplemented++
case "collaboration_takeover":
e.status.TodayCollaborationTakeovers++
}
}
func (e *AutoReplyEngine) noteReason(reason string) {
if reason == "" {
return
}
e.mu.Lock()
defer e.mu.Unlock()
if e.status.ReasonCounts == nil {
e.status.ReasonCounts = make(map[string]int)
}
e.status.ReasonCounts[reason]++
}
func (e *AutoReplyEngine) setLastDurations(timings autoReplyTimings) {
e.mu.Lock()
defer e.mu.Unlock()
e.status.LastKnowledgeDurationMS = timings.KnowledgeDurationMS
e.status.LastKeywordDurationMS = timings.KeywordDurationMS
e.status.LastVectorDurationMS = timings.VectorDurationMS
e.status.LastRerankDurationMS = timings.RerankDurationMS
e.status.LastAIDurationMS = timings.AIDurationMS
e.status.LastTotalDurationMS = timings.TotalDurationMS
}
func (e *AutoReplyEngine) setLastRetrievalScores(keywordScore float64, vectorScore float64, rerankScore float64) {
e.mu.Lock()
defer e.mu.Unlock()
e.status.LastKeywordScore = keywordScore
e.status.LastVectorScore = vectorScore
e.status.LastRerankScore = rerankScore
}
func copyStringIntMap(src map[string]int) map[string]int {
if len(src) == 0 {
return map[string]int{}
}
dst := make(map[string]int, len(src))
for key, value := range src {
dst[key] = value
}
return dst
}