newsbot-api/internal/services/cron/scheduler.go

163 lines
3.6 KiB
Go

package cron
import (
"context"
"database/sql"
"errors"
"log"
_ "github.com/lib/pq"
"github.com/robfig/cron/v3"
"git.jamestombleson.com/jtom38/newsbot-api/internal/domain"
"git.jamestombleson.com/jtom38/newsbot-api/internal/services"
)
type Cron struct {
ctx *context.Context
timer *cron.Cron
repo services.RepositoryService
//queues services
}
func NewScheduler(ctx context.Context, conn *sql.DB) *Cron {
c := &Cron{
ctx: &ctx,
repo: services.NewRepositoryService(conn),
}
timer := cron.New()
//timer.AddFunc("*/5 * * * *", func() { go CheckCache() })
features := services.GetEnvConfig()
if features.RedditEnabled {
timer.AddFunc("5 1-23 * * *", func() { go c.CheckReddit() })
log.Print("[Input] Reddit backend was enabled")
//go c.CheckReddit()
}
if features.YoutubeEnabled {
timer.AddFunc("10 1-23 * * *", func() { go c.CheckYoutube() })
log.Print("[Input] YouTube backend was enabled")
}
if features.FfxivEnabled {
timer.AddFunc("5 5,10,15,20 * * *", func() { go c.CheckFfxiv() })
log.Print("[Input] FFXIV backend was enabled")
}
if features.TwitchEnabled {
timer.AddFunc("15 1-23 * * *", func() { go c.CheckTwitch() })
log.Print("[Input] Twitch backend was enabled")
}
//timer.AddFunc("*/5 * * * *", func() { go c.CheckDiscordQueue() })
//log.Print("[Output] Discord Output was enabled")
c.timer = timer
return c
}
func (c *Cron) Start() {
c.timer.Start()
}
func (c *Cron) Stop() {
c.timer.Stop()
}
/* TODO move to the sqlite queue
func (c *Cron) CheckDiscordQueue() error {
// Get items from the table
queueItems, err := c.Db.ListDiscordQueueItems(*c.ctx, 50)
if err != nil {
return err
}
for _, queue := range queueItems {
// Get the articleByID
article, err := c.Db.GetArticleByID(*c.ctx, queue.Articleid)
if err != nil {
return err
}
var endpoints []string
// List Subscription by SourceID
subs, err := c.Db.ListSubscriptionsBySourceId(*c.ctx, article.Sourceid)
if err != nil {
return err
}
// if no one is subscribed to it, remove it from the index.
if len(subs) == 0 {
log.Printf("No subscriptions found bound to '%v' so it was removed.", article.Sourceid)
err = c.Db.DeleteDiscordQueueItem(*c.ctx, queue.ID)
if err != nil {
return err
}
continue
}
// Get the webhhooks to send to
for _, sub := range subs {
webhook, err := c.Db.GetDiscordWebHooksByID(*c.ctx, sub.Discordwebhookid)
if err != nil {
return err
}
// store them in an array
endpoints = append(endpoints, webhook.Url)
}
// Create Discord Message
dwh := output.NewDiscordWebHookMessage(article)
msg, err := dwh.GeneratePayload()
if err != nil {
return err
}
// Send Message(s)
for _, i := range endpoints {
err = dwh.SendPayload(msg, i)
if err != nil {
return err
}
}
// Remove the item from the queue, given we sent our notification.
err = c.Db.DeleteDiscordQueueItem(*c.ctx, queue.ID)
if err != nil {
return err
}
time.Sleep(10 * time.Second)
}
return nil
}
*/
func (c Cron) SaveNewArticles(posts []domain.ArticleEntity, sourceName string) error {
for _, item := range posts {
_, err := c.repo.Articles.GetByUrl(*c.ctx, item.Url)
if err == nil {
// This url is already known, so skip it
continue
}
// Load the new article in the repository
rows, err := c.repo.Articles.Create(*c.ctx, item.SourceID, item.Tags, item.Title, item.Url, item.Thumbnail, item.Description, item.AuthorName, item.AuthorImageUrl, item.PubDate, item.IsVideo)
if err != nil {
return err
}
if rows != 1 {
return errors.New("failed to create a new record for some reason")
}
}
return nil
}