feat: implement system statistics tracking, commit management controllers, and associated database migrations
All checks were successful
Build and Release / release (push) Successful in 1m49s
All checks were successful
Build and Release / release (push) Successful in 1m49s
This commit is contained in:
@@ -96,6 +96,7 @@ func (s *FiberServer) SetupServer(
|
||||
|
||||
raguRepo := repositories.NewRagRepository(poolPg, redis)
|
||||
usageRepo := repositories.NewUsageRepository(redis)
|
||||
statisticRepo := repositories.NewStatisticRepository(poolPg, redis)
|
||||
|
||||
// service setup
|
||||
authService := services.NewAuthService(userRepo, roleRepo, tokenRepo, redis, poolPg)
|
||||
@@ -116,6 +117,7 @@ func (s *FiberServer) SetupServer(
|
||||
raguRepo, raguUtils, poolPg, redis,
|
||||
)
|
||||
chatbotService := services.NewChatbotService(raguRepo, usageRepo, raguUtils)
|
||||
statisticService := services.NewStatisticService(statisticRepo)
|
||||
|
||||
// controller setup
|
||||
authController := controllers.NewAuthController(authService, oauth)
|
||||
@@ -132,6 +134,7 @@ func (s *FiberServer) SetupServer(
|
||||
commitController := controllers.NewCommitController(commitService)
|
||||
submissionController := controllers.NewSubmissionController(submissionService)
|
||||
chatbotController := controllers.NewChatbotController(chatbotService)
|
||||
statisticController := controllers.NewStatisticController(statisticService)
|
||||
|
||||
// route setup
|
||||
routes.AuthRoutes(s.App, authController, userRepo)
|
||||
@@ -147,5 +150,6 @@ func (s *FiberServer) SetupServer(
|
||||
routes.ProjectRoutes(s.App, projectController, commitController, userRepo)
|
||||
routes.SubmissionRoutes(s.App, submissionController, userRepo)
|
||||
routes.ChatbotRoutes(s.App, chatbotController, userRepo)
|
||||
routes.StatisticRoutes(s.App, statisticController, userRepo)
|
||||
routes.NotFoundRoute(s.App)
|
||||
}
|
||||
|
||||
132
cmd/worker/cron/main.go
Normal file
132
cmd/worker/cron/main.go
Normal file
@@ -0,0 +1,132 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"history-api/internal/repositories"
|
||||
"history-api/pkg/cache"
|
||||
"history-api/pkg/config"
|
||||
"history-api/pkg/storage"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/go-co-op/gocron/v2"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
func runStatistics(ctx context.Context, repo repositories.StatisticRepository) {
|
||||
log.Info().Msg("Running daily statistics...")
|
||||
today := time.Now().UTC().Truncate(24 * time.Hour)
|
||||
_, err := repo.Upsert(ctx, today)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to upsert system statistics")
|
||||
} else {
|
||||
log.Info().Msg("Successfully updated daily statistics and cleared cache")
|
||||
}
|
||||
}
|
||||
|
||||
func runBackup(ctx context.Context, s3 storage.Storage, dbURI string) {
|
||||
log.Info().Msg("Running weekly database backup...")
|
||||
|
||||
tmpDir := os.TempDir()
|
||||
fileName := fmt.Sprintf("db_backup_%s.sql", time.Now().Format("2006-01-02_15-04-05"))
|
||||
filePath := filepath.Join(tmpDir, fileName)
|
||||
|
||||
// Run pg_dump
|
||||
cmd := exec.Command("pg_dump", dbURI, "-F", "c", "-f", filePath)
|
||||
if err := cmd.Run(); err != nil {
|
||||
log.Error().Err(err).Msg("Failed to execute pg_dump. Make sure pg_dump is installed.")
|
||||
return
|
||||
}
|
||||
defer os.Remove(filePath)
|
||||
|
||||
file, err := os.Open(filePath)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to open backup file")
|
||||
return
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
stat, err := file.Stat()
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to stat backup file")
|
||||
return
|
||||
}
|
||||
|
||||
key := fmt.Sprintf("backups/%s", fileName)
|
||||
err = s3.Upload(ctx, key, file, stat.Size(), storage.UploadOptions{
|
||||
ContentType: "application/octet-stream",
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to upload backup to S3")
|
||||
} else {
|
||||
log.Info().Str("key", key).Msg("Successfully uploaded backup to S3")
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
config.LoadEnv()
|
||||
|
||||
dbURI, err := config.GetConfig("DATABASE_URI")
|
||||
if err != nil {
|
||||
log.Fatal().Err(err).Msg("DATABASE_URI not set")
|
||||
}
|
||||
|
||||
dbPool, err := pgxpool.New(context.Background(), dbURI)
|
||||
if err != nil {
|
||||
log.Fatal().Err(err).Msg("Failed to connect to DB")
|
||||
}
|
||||
defer dbPool.Close()
|
||||
|
||||
redis, err := cache.NewRedisClient()
|
||||
if err != nil {
|
||||
log.Fatal().Err(err).Msg("Failed to connect to Redis")
|
||||
}
|
||||
|
||||
statisticRepo := repositories.NewStatisticRepository(dbPool, redis)
|
||||
|
||||
s3Store, err := storage.NewS3Storage()
|
||||
if err != nil {
|
||||
log.Fatal().Err(err).Msg("Failed to init S3 storage")
|
||||
}
|
||||
|
||||
log.Info().Msg("Cron worker started")
|
||||
|
||||
// Run initially on startup
|
||||
runStatistics(context.Background(), statisticRepo)
|
||||
|
||||
s, err := gocron.NewScheduler()
|
||||
if err != nil {
|
||||
log.Fatal().Err(err).Msg("Failed to create scheduler")
|
||||
}
|
||||
|
||||
// Run statistics every day at midnight (00:00)
|
||||
_, err = s.NewJob(
|
||||
gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(0, 0, 0))),
|
||||
gocron.NewTask(func() {
|
||||
runStatistics(context.Background(), statisticRepo)
|
||||
}),
|
||||
)
|
||||
if err != nil {
|
||||
log.Fatal().Err(err).Msg("Failed to schedule daily statistics")
|
||||
}
|
||||
|
||||
// Run backup every Sunday at 01:00 AM
|
||||
_, err = s.NewJob(
|
||||
gocron.WeeklyJob(1, gocron.NewWeekdays(time.Sunday), gocron.NewAtTimes(gocron.NewAtTime(1, 0, 0))),
|
||||
gocron.NewTask(func() {
|
||||
runBackup(context.Background(), s3Store, dbURI)
|
||||
}),
|
||||
)
|
||||
if err != nil {
|
||||
log.Fatal().Err(err).Msg("Failed to schedule weekly backup")
|
||||
}
|
||||
|
||||
s.Start()
|
||||
|
||||
select {}
|
||||
}
|
||||
@@ -87,6 +87,26 @@ func runSingleWorker(ctx context.Context, rdb *redis.Client, consumerID int) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if taskType == constants.TaskTypeAdminUserAction.String() {
|
||||
var data models.AdminUserActionPayload
|
||||
if err := json.Unmarshal([]byte(payloadStr), &data); err != nil {
|
||||
log.Error().Err(err).Msg("Failed to unmarshal payload")
|
||||
continue
|
||||
}
|
||||
|
||||
log.Info().
|
||||
Str("worker", consumerName).
|
||||
Str("email", data.Email).
|
||||
Str("action", data.Action).
|
||||
Msg("Processing admin user action email task")
|
||||
|
||||
errSend := email.SendAdminUserActionMail(&data)
|
||||
if errSend != nil {
|
||||
log.Error().Err(errSend).Str("email", data.Email).Msg("Failed to send email")
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
rdb.XAck(ctx, constants.StreamEmailName, constants.GroupEmailName, message.ID)
|
||||
log.Info().Str("msg_id", message.ID).Msg("Task acknowledged")
|
||||
|
||||
Reference in New Issue
Block a user