From ce45b509d3c25e60ca4c33c67b54bcd204336549 Mon Sep 17 00:00:00 2001 From: James Tombleson Date: Mon, 29 Apr 2024 16:29:54 -0700 Subject: [PATCH] broke cron out and moved to the new domain models --- internal/services/cron/ffxivJob.go | 30 ++++ internal/services/cron/redditJob.go | 33 ++++ internal/services/cron/scheduler.go | 211 ++++------------------- internal/services/cron/scheduler_test.go | 55 +++++- internal/services/cron/twitchJob.go | 41 +++++ internal/services/cron/youtube.go | 32 ++++ 6 files changed, 211 insertions(+), 191 deletions(-) create mode 100644 internal/services/cron/ffxivJob.go create mode 100644 internal/services/cron/redditJob.go create mode 100644 internal/services/cron/twitchJob.go create mode 100644 internal/services/cron/youtube.go diff --git a/internal/services/cron/ffxivJob.go b/internal/services/cron/ffxivJob.go new file mode 100644 index 0000000..d672037 --- /dev/null +++ b/internal/services/cron/ffxivJob.go @@ -0,0 +1,30 @@ +package cron + +import ( + "log" + + "git.jamestombleson.com/jtom38/newsbot-api/internal/domain" + "git.jamestombleson.com/jtom38/newsbot-api/internal/services/input" +) + +func (c *Cron) CheckFfxiv() { + sources, err := c.repo.Sources.ListBySource(*c.ctx, 0, 10, domain.SourceCollectorFfxiv) + if err != nil { + log.Printf("[FFXIV] No sources found to query - %v\r", err) + } + + for _, source := range sources { + if !source.Enabled { + continue + } + + fc := input.NewFFXIVClient(source) + items, err := fc.CheckSource() + if err != nil { + log.Println(err) + } + + c.SaveNewArticles(items, domain.SourceCollectorFfxiv) + } + log.Printf("[FFXIV Done!]") +} diff --git a/internal/services/cron/redditJob.go b/internal/services/cron/redditJob.go new file mode 100644 index 0000000..7cfe240 --- /dev/null +++ b/internal/services/cron/redditJob.go @@ -0,0 +1,33 @@ +package cron + +import ( + "log" + + "git.jamestombleson.com/jtom38/newsbot-api/internal/domain" + "git.jamestombleson.com/jtom38/newsbot-api/internal/services/input" +) + +// This is the main entry point to query all the reddit services +func (c *Cron) CheckReddit() { + sources, err := c.repo.Sources.ListBySource(*c.ctx, 0, 100, domain.SourceCollectorReddit) + if err != nil { + log.Printf("[Reddit] No sources found to query - %v\r", err) + } + + for _, source := range sources { + if !source.Enabled { + continue + } + + log.Printf("[Reddit] Checking '%v'...", source.DisplayName) + rc := input.NewRedditClient(source) + raw, err := rc.GetContent() + if err != nil { + log.Println(err) + } + redditArticles := rc.ConvertToArticles(raw) + + c.SaveNewArticles(redditArticles, domain.SourceCollectorReddit) + } + log.Print("[Reddit] Done!") +} diff --git a/internal/services/cron/scheduler.go b/internal/services/cron/scheduler.go index edb89a0..de98b1b 100644 --- a/internal/services/cron/scheduler.go +++ b/internal/services/cron/scheduler.go @@ -3,83 +3,56 @@ package cron import ( "context" "database/sql" - "fmt" + "errors" "log" - "time" - "github.com/google/uuid" _ "github.com/lib/pq" "github.com/robfig/cron/v3" - "git.jamestombleson.com/jtom38/newsbot-api/internal/database" + "git.jamestombleson.com/jtom38/newsbot-api/internal/domain" "git.jamestombleson.com/jtom38/newsbot-api/internal/services" - "git.jamestombleson.com/jtom38/newsbot-api/internal/services/input" - "git.jamestombleson.com/jtom38/newsbot-api/internal/services/output" ) type Cron struct { - Db *database.Queries ctx *context.Context timer *cron.Cron + repo services.RepositoryService } -func openDatabase() (*database.Queries, error) { - _env := services.NewConfig() - connString := _env.GetConfig(services.Sql_Connection_String) - if connString == "" { - panic("Connection String is null!") - } - db, err := sql.Open("postgres", connString) - if err != nil { - panic(err) - } - - queries := database.New(db) - return queries, err -} - -func NewScheduler(ctx context.Context) *Cron { +func NewScheduler(ctx context.Context, conn *sql.DB) *Cron { c := &Cron{ - ctx: &ctx, + ctx: &ctx, + repo: services.NewRepositoryService(conn), } timer := cron.New() - queries, err := openDatabase() - if err != nil { - panic(err) - } - c.Db = queries //timer.AddFunc("*/5 * * * *", func() { go CheckCache() }) - features := services.NewConfig() + features := services.GetEnvConfig() - res, _ := features.GetFeature(services.FEATURE_ENABLE_REDDIT_BACKEND) - if res { + if features.RedditEnabled { timer.AddFunc("5 1-23 * * *", func() { go c.CheckReddit() }) log.Print("[Input] Reddit backend was enabled") //go c.CheckReddit() } - res, _ = features.GetFeature(services.FEATURE_ENABLE_YOUTUBE_BACKEND) - if res { + if features.YoutubeEnabled { timer.AddFunc("10 1-23 * * *", func() { go c.CheckYoutube() }) log.Print("[Input] YouTube backend was enabled") } - res, _ = features.GetFeature(services.FEATURE_ENABLE_FFXIV_BACKEND) - if res { + if features.FfxivEnabled { timer.AddFunc("5 5,10,15,20 * * *", func() { go c.CheckFfxiv() }) log.Print("[Input] FFXIV backend was enabled") } - res, _ = features.GetFeature(services.FEATURE_ENABLE_TWITCH_BACKEND) - if res { + if features.TwitchEnabled { timer.AddFunc("15 1-23 * * *", func() { go c.CheckTwitch() }) log.Print("[Input] Twitch backend was enabled") } - timer.AddFunc("*/5 * * * *", func() { go c.CheckDiscordQueue() }) - log.Print("[Output] Discord Output was enabled") + //timer.AddFunc("*/5 * * * *", func() { go c.CheckDiscordQueue() }) + //log.Print("[Output] Discord Output was enabled") c.timer = timer return c @@ -93,104 +66,7 @@ func (c *Cron) Stop() { c.timer.Stop() } -// This is the main entry point to query all the reddit services -func (c *Cron) CheckReddit() { - sources, err := c.Db.ListSourcesBySource(*c.ctx, "reddit") - if err != nil { - log.Printf("[Reddit] No sources found to query - %v\r", err) - } - - for _, source := range sources { - if !source.Enabled { - continue - } - log.Printf("[Reddit] Checking '%v'...", source.Name) - rc := input.NewRedditClient(source) - raw, err := rc.GetContent() - if err != nil { - log.Println(err) - } - redditArticles := rc.ConvertToArticles(raw) - c.checkPosts(redditArticles, "Reddit") - } - log.Print("[Reddit] Done!") -} - -func (c *Cron) CheckYoutube() { - // Add call to the db to request youtube sources. - sources, err := c.Db.ListSourcesBySource(*c.ctx, "youtube") - if err != nil { - log.Printf("[Youtube] No sources found to query - %v\r", err) - } - - for _, source := range sources { - if !source.Enabled { - continue - } - log.Printf("[YouTube] Checking '%v'...", source.Name) - yc := input.NewYoutubeClient(source) - raw, err := yc.GetContent() - if err != nil { - log.Println(err) - } - c.checkPosts(raw, "YouTube") - } - log.Print("[YouTube] Done!") -} - -func (c *Cron) CheckFfxiv() { - sources, err := c.Db.ListSourcesBySource(*c.ctx, "ffxiv") - if err != nil { - log.Printf("[FFXIV] No sources found to query - %v\r", err) - } - - for _, source := range sources { - if !source.Enabled { - continue - } - fc := input.NewFFXIVClient(source) - items, err := fc.CheckSource() - if err != nil { - log.Println(err) - } - c.checkPosts(items, "FFXIV") - } - log.Printf("[FFXIV Done!]") -} - -func (c *Cron) CheckTwitch() error { - sources, err := c.Db.ListSourcesBySource(*c.ctx, "twitch") - if err != nil { - log.Printf("[Twitch] No sources found to query - %v\r", err) - } - - tc, err := input.NewTwitchClient() - if err != nil { - return err - } - - err = tc.Login() - if err != nil { - return err - } - - for _, source := range sources { - if !source.Enabled { - continue - } - log.Printf("[Twitch] Checking '%v'...", source.Name) - tc.ReplaceSourceRecord(source) - items, err := tc.GetContent() - if err != nil { - log.Println(err) - } - c.checkPosts(items, "Twitch") - } - - log.Print("[Twitch] Done!") - return nil -} - +/* TODO move to the sqlite queue func (c *Cron) CheckDiscordQueue() error { // Get items from the table queueItems, err := c.Db.ListDiscordQueueItems(*c.ctx, 50) @@ -260,55 +136,26 @@ func (c *Cron) CheckDiscordQueue() error { return nil } +*/ -func (c *Cron) checkPosts(posts []database.Article, sourceName string) error { +func (c Cron) SaveNewArticles(posts []domain.ArticleEntity, sourceName string) error { for _, item := range posts { - _, err := c.Db.GetArticleByUrl(*c.ctx, item.Url) + _, err := c.repo.Articles.GetByUrl(*c.ctx, item.Url) + if err == nil { + // This url is already known, so skip it + continue + } + + // Load the new article in the repository + rows, err := c.repo.Articles.Create(*c.ctx, item.SourceID, item.Tags, item.Title, item.Url, item.Thumbnail, item.Description, item.AuthorName, item.AuthorImageUrl, item.PubDate, item.IsVideo) if err != nil { - id := uuid.New() - - err := c.postArticle(id, item) - if err != nil { - return fmt.Errorf("[%v] Failed to post article - %v - %v.\r", sourceName, item.Url, err) - } - - err = c.addToDiscordQueue(id) - if err != nil { - return err - } + return err + } + if rows != 1 { + return errors.New("failed to create a new record for some reason") } } - time.Sleep(30 * time.Second) - return nil -} - -func (c *Cron) postArticle(id uuid.UUID, item database.Article) error { - err := c.Db.CreateArticle(*c.ctx, database.CreateArticleParams{ - ID: id, - Sourceid: item.Sourceid, - Tags: item.Tags, - Title: item.Title, - Url: item.Url, - Pubdate: item.Pubdate, - Video: item.Video, - Videoheight: item.Videoheight, - Videowidth: item.Videowidth, - Thumbnail: item.Thumbnail, - Description: item.Description, - Authorname: item.Authorname, - Authorimage: item.Authorimage, - }) - return err -} - -func (c *Cron) addToDiscordQueue(Id uuid.UUID) error { - err := c.Db.CreateDiscordQueue(*c.ctx, database.CreateDiscordQueueParams{ - ID: uuid.New(), - Articleid: Id, - }) - if err != nil { - return err - } + return nil } diff --git a/internal/services/cron/scheduler_test.go b/internal/services/cron/scheduler_test.go index 14bbd38..f42e4e0 100644 --- a/internal/services/cron/scheduler_test.go +++ b/internal/services/cron/scheduler_test.go @@ -2,33 +2,70 @@ package cron_test import ( "context" + "database/sql" "testing" "git.jamestombleson.com/jtom38/newsbot-api/internal/services/cron" + _ "github.com/glebarez/go-sqlite" + "github.com/pressly/goose/v3" ) -func TestInvokeTwitch(t *testing.T) { - -} - -// TODO add database mocks but not sure how to do that yet. func TestCheckReddit(t *testing.T) { + db, err := setupInMemoryDb() + if err != nil { + t.Log(err) + t.FailNow() + } + defer db.Close() + ctx := context.Background() - c := cron.NewScheduler(ctx) + c := cron.NewScheduler(ctx, db) c.CheckReddit() } func TestCheckYouTube(t *testing.T) { + db, err := setupInMemoryDb() + if err != nil { + t.Log(err) + t.FailNow() + } + defer db.Close() + ctx := context.Background() - c := cron.NewScheduler(ctx) + c := cron.NewScheduler(ctx, db) c.CheckYoutube() } func TestCheckTwitch(t *testing.T) { + db, err := setupInMemoryDb() + if err != nil { + t.Log(err) + t.FailNow() + } + defer db.Close() + ctx := context.Background() - c := cron.NewScheduler(ctx) - err := c.CheckTwitch() + c := cron.NewScheduler(ctx, db) + err = c.CheckTwitch() if err != nil { t.Error(err) } } + +func setupInMemoryDb() (*sql.DB, error) { + db, err := sql.Open("sqlite", ":memory:") + if err != nil { + return nil, err + } + + err = goose.SetDialect("sqlite3") + if err != nil { + return nil, err + } + + err = goose.Up(db, "../database/migrations") + if err != nil { + return nil, err + } + return db, nil +} diff --git a/internal/services/cron/twitchJob.go b/internal/services/cron/twitchJob.go new file mode 100644 index 0000000..305b3c8 --- /dev/null +++ b/internal/services/cron/twitchJob.go @@ -0,0 +1,41 @@ +package cron + +import ( + "log" + + "git.jamestombleson.com/jtom38/newsbot-api/internal/domain" + "git.jamestombleson.com/jtom38/newsbot-api/internal/services/input" +) + +func (c *Cron) CheckTwitch() error { + sources, err := c.repo.Sources.ListBySource(*c.ctx, 0, 100, domain.SourceCollectorTwitch) + if err != nil { + log.Printf("[Twitch] No sources found to query - %v\r", err) + } + + tc, err := input.NewTwitchClient() + if err != nil { + return err + } + + err = tc.Login() + if err != nil { + return err + } + + for _, source := range sources { + if !source.Enabled { + continue + } + log.Printf("[Twitch] Checking '%v'...", source.DisplayName) + tc.ReplaceSourceRecord(source) + items, err := tc.GetContent() + if err != nil { + log.Println(err) + } + c.SaveNewArticles(items, domain.SourceCollectorTwitch) + } + + log.Print("[Twitch] Done!") + return nil +} diff --git a/internal/services/cron/youtube.go b/internal/services/cron/youtube.go new file mode 100644 index 0000000..6266e0c --- /dev/null +++ b/internal/services/cron/youtube.go @@ -0,0 +1,32 @@ +package cron + +import ( + "log" + + "git.jamestombleson.com/jtom38/newsbot-api/internal/domain" + "git.jamestombleson.com/jtom38/newsbot-api/internal/services/input" +) + +func (c *Cron) CheckYoutube() { + // Add call to the db to request youtube sources. + sources, err := c.repo.Sources.ListBySource(*c.ctx, 0, 100, domain.SourceCollectorYoutube) + if err != nil { + log.Printf("[Youtube] No sources found to query - %v\r", err) + } + + for _, source := range sources { + if !source.Enabled { + continue + } + + log.Printf("[YouTube] Checking '%v'...", source.DisplayName) + + yc := input.NewYoutubeClient(source) + raw, err := yc.GetContent() + if err != nil { + log.Println(err) + } + c.SaveNewArticles(raw, domain.SourceCollectorYoutube) + } + log.Print("[YouTube] Done!") +}