From a4fed88b8ac17c8c6b85b10893e8641daf27f5fa Mon Sep 17 00:00:00 2001 From: AzenKain Date: Thu, 7 May 2026 11:38:18 +0700 Subject: [PATCH] feat: implement RAG chatbot service, background cron worker, and asynchronous indexing infrastructure --- cmd/worker/cron/main.go | 1 - cmd/worker/rag/main.go | 6 ------ internal/controllers/chatbotController.go | 1 - internal/repositories/rasterTileRepository.go | 1 - internal/repositories/statisticRepo.go | 1 - internal/repositories/tileRepository.go | 1 - internal/services/chatbotService.go | 2 -- 7 files changed, 13 deletions(-) diff --git a/cmd/worker/cron/main.go b/cmd/worker/cron/main.go index 70c39f5..1aaf11b 100644 --- a/cmd/worker/cron/main.go +++ b/cmd/worker/cron/main.go @@ -35,7 +35,6 @@ func runBackup(ctx context.Context, s3 storage.Storage, dbURI string) { fileName := fmt.Sprintf("db_backup_%s.sql", time.Now().Format("2006-01-02_15-04-05")) filePath := filepath.Join(tmpDir, fileName) - // Run pg_dump cmd := exec.Command("pg_dump", dbURI, "-F", "c", "-f", filePath) if err := cmd.Run(); err != nil { log.Error().Err(err).Msg("Failed to execute pg_dump. Make sure pg_dump is installed.") diff --git a/cmd/worker/rag/main.go b/cmd/worker/rag/main.go index 7427b5e..fcfb298 100644 --- a/cmd/worker/rag/main.go +++ b/cmd/worker/rag/main.go @@ -40,7 +40,6 @@ func processRagTask(ctx context.Context, ragRepo repositories.RagRepository, rag } } - // 2. Index wikis with delay + retry for _, wiki := range task.Wikis { if wiki.Source != "inline" { continue @@ -83,7 +82,6 @@ func processRagTask(ctx context.Context, ragRepo repositories.RagRepository, rag continue } - // Delete existing chunks then save new ones _ = ragRepo.DeleteBySourceIDs(ctx, "wiki", []string{wiki.ID}) for i, chunk := range chunks { if saveErr := ragRepo.SaveChunk(ctx, "wiki", wiki.ID, task.ProjectID, i, chunk, vectors[i]); saveErr != nil { @@ -92,12 +90,9 @@ func processRagTask(ctx context.Context, ragRepo repositories.RagRepository, rag } log.Info().Str("worker", workerName).Str("wiki_id", wiki.ID).Int("chunks", len(chunks)).Msg("Wiki indexed successfully") - - // Delay between items to avoid rate limit time.Sleep(itemDelay) } - // 3. Index entities with delay + retry for _, entity := range task.Entities { if entity.Source != "inline" { continue @@ -140,7 +135,6 @@ func processRagTask(ctx context.Context, ragRepo repositories.RagRepository, rag continue } - // Delete existing chunks then save new ones _ = ragRepo.DeleteBySourceIDs(ctx, "entity", []string{entity.ID}) for i, chunk := range chunks { if saveErr := ragRepo.SaveChunk(ctx, "entity", entity.ID, task.ProjectID, i, chunk, vectors[i]); saveErr != nil { diff --git a/internal/controllers/chatbotController.go b/internal/controllers/chatbotController.go index 0548f69..a63493f 100644 --- a/internal/controllers/chatbotController.go +++ b/internal/controllers/chatbotController.go @@ -49,7 +49,6 @@ func (cx *ChatbotController) Chat(c fiber.Ctx) error { answer, err := cx.chatbotService.Chat(ctx, claims.UId, dto.ProjectID, dto.Question) if err != nil { - // Trả về lỗi 429 (Too Many Requests) nếu hết lượt dùng if err.Error() == "you have reached your daily limit of 10 questions. Please come back tomorrow" { return c.Status(fiber.StatusTooManyRequests).JSON(response.CommonResponse{ Status: false, diff --git a/internal/repositories/rasterTileRepository.go b/internal/repositories/rasterTileRepository.go index 3ae4f86..c6b1b9f 100644 --- a/internal/repositories/rasterTileRepository.go +++ b/internal/repositories/rasterTileRepository.go @@ -61,7 +61,6 @@ func (r *rasterTileRepository) GetTile(ctx context.Context, z, x, y int) ([]byte return nil, "", fmt.Errorf("invalid tile coordinates") } - // cache key cacheId := fmt.Sprintf("rasterTile:%d:%d:%d", z, x, y) var cached []byte diff --git a/internal/repositories/statisticRepo.go b/internal/repositories/statisticRepo.go index 62419e8..85e9394 100644 --- a/internal/repositories/statisticRepo.go +++ b/internal/repositories/statisticRepo.go @@ -224,7 +224,6 @@ func (r *statisticRepository) Upsert(ctx context.Context, date time.Time) (*mode entity := mapToEntity(row) - // Clear search cache and the specific date cache go func() { bgCtx := context.Background() _ = r.c.DelByPattern(bgCtx, "statistic:search*") diff --git a/internal/repositories/tileRepository.go b/internal/repositories/tileRepository.go index 91dd7e9..5739a2e 100644 --- a/internal/repositories/tileRepository.go +++ b/internal/repositories/tileRepository.go @@ -61,7 +61,6 @@ func (r *tileRepository) GetTile(ctx context.Context, z, x, y int) ([]byte, stri return nil, "", false, fmt.Errorf("invalid tile coordinates") } - // cache key cacheId := fmt.Sprintf("tile:%d:%d:%d", z, x, y) var cached []byte diff --git a/internal/services/chatbotService.go b/internal/services/chatbotService.go index e6a3754..5dfb663 100644 --- a/internal/services/chatbotService.go +++ b/internal/services/chatbotService.go @@ -78,8 +78,6 @@ Question: %s`, contextStr, question) if err != nil { return "", err } - - // 3. Tăng số lần sử dụng sau khi gọi AI thành công _, _ = s.usageRepo.IncrementAIUsage(ctx, userID) return response, nil