package cron import ( "context" "database/sql" "errors" "log" _ "github.com/lib/pq" "github.com/robfig/cron/v3" "git.jamestombleson.com/jtom38/newsbot-api/internal/domain" "git.jamestombleson.com/jtom38/newsbot-api/internal/services" ) type Cron struct { ctx *context.Context timer *cron.Cron repo services.RepositoryService //queues services } func NewScheduler(ctx context.Context, conn *sql.DB) *Cron { c := &Cron{ ctx: &ctx, repo: services.NewRepositoryService(conn), } timer := cron.New() //timer.AddFunc("*/5 * * * *", func() { go CheckCache() }) features := services.GetEnvConfig() if features.RedditEnabled { timer.AddFunc("5 1-23 * * *", func() { go c.CheckReddit() }) log.Print("[Input] Reddit backend was enabled") //go c.CheckReddit() } if features.YoutubeEnabled { timer.AddFunc("10 1-23 * * *", func() { go c.CheckYoutube() }) log.Print("[Input] YouTube backend was enabled") } if features.FfxivEnabled { timer.AddFunc("5 5,10,15,20 * * *", func() { go c.CheckFfxiv() }) log.Print("[Input] FFXIV backend was enabled") } if features.TwitchEnabled { timer.AddFunc("15 1-23 * * *", func() { go c.CheckTwitch() }) log.Print("[Input] Twitch backend was enabled") } //timer.AddFunc("*/5 * * * *", func() { go c.CheckDiscordQueue() }) //log.Print("[Output] Discord Output was enabled") c.timer = timer return c } func (c *Cron) Start() { c.timer.Start() } func (c *Cron) Stop() { c.timer.Stop() } /* TODO move to the sqlite queue func (c *Cron) CheckDiscordQueue() error { // Get items from the table queueItems, err := c.Db.ListDiscordQueueItems(*c.ctx, 50) if err != nil { return err } for _, queue := range queueItems { // Get the articleByID article, err := c.Db.GetArticleByID(*c.ctx, queue.Articleid) if err != nil { return err } var endpoints []string // List Subscription by SourceID subs, err := c.Db.ListSubscriptionsBySourceId(*c.ctx, article.Sourceid) if err != nil { return err } // if no one is subscribed to it, remove it from the index. if len(subs) == 0 { log.Printf("No subscriptions found bound to '%v' so it was removed.", article.Sourceid) err = c.Db.DeleteDiscordQueueItem(*c.ctx, queue.ID) if err != nil { return err } continue } // Get the webhhooks to send to for _, sub := range subs { webhook, err := c.Db.GetDiscordWebHooksByID(*c.ctx, sub.Discordwebhookid) if err != nil { return err } // store them in an array endpoints = append(endpoints, webhook.Url) } // Create Discord Message dwh := output.NewDiscordWebHookMessage(article) msg, err := dwh.GeneratePayload() if err != nil { return err } // Send Message(s) for _, i := range endpoints { err = dwh.SendPayload(msg, i) if err != nil { return err } } // Remove the item from the queue, given we sent our notification. err = c.Db.DeleteDiscordQueueItem(*c.ctx, queue.ID) if err != nil { return err } time.Sleep(10 * time.Second) } return nil } */ func (c Cron) SaveNewArticles(posts []domain.ArticleEntity, sourceName string) error { for _, item := range posts { _, err := c.repo.Articles.GetByUrl(*c.ctx, item.Url) if err == nil { // This url is already known, so skip it continue } // Load the new article in the repository rows, err := c.repo.Articles.Create(*c.ctx, item.SourceID, item.Tags, item.Title, item.Url, item.Thumbnail, item.Description, item.AuthorName, item.AuthorImageUrl, item.PubDate, item.IsVideo) if err != nil { return err } if rows != 1 { return errors.New("failed to create a new record for some reason") } } return nil }