From ddf50077b55a766a6ed8b57c356b6555480fe34c Mon Sep 17 00:00:00 2001 From: James Tombleson Date: Wed, 1 May 2024 18:26:32 -0700 Subject: [PATCH] cron is now cleaned up and only running rss for now --- internal/services/cron/collectors.go | 220 ++++++++++++++++ .../cron/{rss_test.go => collectors_test.go} | 4 +- internal/services/cron/rss.go | 59 ----- internal/services/cron/scheduler.go | 248 ++---------------- internal/services/cron/scheduler_test.go | 9 +- 5 files changed, 250 insertions(+), 290 deletions(-) create mode 100644 internal/services/cron/collectors.go rename internal/services/cron/{rss_test.go => collectors_test.go} (91%) delete mode 100644 internal/services/cron/rss.go diff --git a/internal/services/cron/collectors.go b/internal/services/cron/collectors.go new file mode 100644 index 0000000..6e0ed32 --- /dev/null +++ b/internal/services/cron/collectors.go @@ -0,0 +1,220 @@ +package cron + +import ( + "log" + "time" + + "git.jamestombleson.com/jtom38/newsbot-api/internal/domain" + "git.jamestombleson.com/jtom38/newsbot-api/internal/services/input" +) + +func (c *Cron) CollectRssPosts() { + log.Println("Starting ") + sources, err := c.repo.Sources.ListBySource(*c.ctx, 0, 1000, domain.SourceCollectorRss) + if err != nil { + log.Println(err) + } + + for sourceIndex, source := range sources { + if !source.Enabled { + continue + } + + rssClient := input.NewRssClient(source) + articles, err := rssClient.GetArticles() + if err != nil { + log.Println(err) + } + + for _, article := range articles { + _, err := c.repo.Articles.GetByUrl(*c.ctx, article.Url) + if err == nil { + continue + } + + rowsCreated, err := c.repo.Articles.CreateFromEntity(*c.ctx, article) + if err != nil { + log.Println(err) + } + if rowsCreated != 1 { + log.Println("Got back the wrong number of rows") + } + } + + if sourceIndex != len(sources) { + time.Sleep(time.Second * 30) + } + } +} + +func (c *Cron) CollectRedditPosts() { + sources, err := c.repo.Sources.ListBySource(*c.ctx, 0, 1000, domain.SourceCollectorReddit) + if err != nil { + log.Printf("[Reddit] No sources found to query - %v\r", err) + } + + for _, source := range sources { + if !source.Enabled { + continue + } + + log.Printf("[Reddit] Checking '%v'...", source.DisplayName) + rc := input.NewRedditClient(source) + raw, err := rc.GetContent() + if err != nil { + log.Println(err) + } + + redditArticles := rc.ConvertToArticles(raw) + for _, article := range redditArticles { + _, err := c.repo.Articles.GetByUrl(*c.ctx, article.Url) + if err == nil { + continue + } + + rowsAdded, err := c.repo.Articles.CreateFromEntity(*c.ctx, article) + if err != nil { + log.Printf("Failed to add a new reddit article to the database: %s", err) + } + + if rowsAdded != 1 { + log.Printf("no error came back when data was added to the database but the expected row count is wrong") + } + } + } + log.Print("[Reddit] Done!") +} + +func (c *Cron) CollectYoutubePosts() { + sources, err := c.repo.Sources.ListBySource(*c.ctx, 0, 1000, domain.SourceCollectorYoutube) + if err != nil { + log.Printf("[Youtube] No sources found to query - %v\r", err) + } + + for sourceIndex, source := range sources { + if !source.Enabled { + continue + } + + log.Printf("[YouTube] Checking '%v'...", source.DisplayName) + yc := input.NewYoutubeClient(source) + raw, err := yc.GetContent() + if err != nil { + log.Println(err) + } + + for _, article := range raw { + _, err := c.repo.Articles.GetByUrl(*c.ctx, article.Url) + if err == nil { + continue + } + + rowsAdded, err := c.repo.Articles.CreateFromEntity(*c.ctx, article) + if err != nil { + log.Printf("Failed to add a new youtube article to the database: %s", err) + } + + if rowsAdded != 1 { + log.Printf("no error came back when data was added to the database but the expected row count is wrong") + } + } + + if sourceIndex != len(sources) { + time.Sleep(time.Second * 30) + } + } + log.Print("[YouTube] Done!") +} + +func (c *Cron) CollectFfxivPosts() { + sources, err := c.repo.Sources.ListBySource(*c.ctx, 0, 1000, domain.SourceCollectorFfxiv) + if err != nil { + log.Printf("[FFXIV] No sources found to query - %v\r", err) + } + + for sourceIndex, source := range sources { + if !source.Enabled { + continue + } + + fc := input.NewFFXIVClient(source) + items, err := fc.CheckSource() + if err != nil { + log.Println(err) + } + + for _, article := range items { + _, err := c.repo.Articles.GetByUrl(*c.ctx, article.Url) + if err == nil { + continue + } + + rowsAdded, err := c.repo.Articles.CreateFromEntity(*c.ctx, article) + if err != nil { + log.Printf("Failed to add a new FFXIV article to the database: %s", err) + } + + if rowsAdded != 1 { + log.Printf("no error came back when data was added to the database but the expected row count is wrong") + } + } + + if sourceIndex != len(sources) { + time.Sleep(time.Second * 30) + } + } + log.Printf("[FFXIV Done!]") +} + +func (c *Cron) CollectTwitchPosts() { + sources, err := c.repo.Sources.ListBySource(*c.ctx, 0, 1000, domain.SourceCollectorTwitch) + if err != nil { + log.Printf("[Twitch] No sources found to query - %v\r", err) + } + + tc, err := input.NewTwitchClient() + if err != nil { + log.Println(err) + return + } + + err = tc.Login() + if err != nil { + log.Println(err) + } + + for sourceIndex, source := range sources { + if !source.Enabled { + continue + } + + log.Printf("[Twitch] Checking '%v'...", source.DisplayName) + tc.ReplaceSourceRecord(source) + items, err := tc.GetContent() + if err != nil { + log.Println(err) + } + + for _, article := range items { + _, err := c.repo.Articles.GetByUrl(*c.ctx, article.Url) + if err == nil { + continue + } + + rowsAdded, err := c.repo.Articles.CreateFromEntity(*c.ctx, article) + if err != nil { + log.Printf("Failed to add a new Twitch article to the database: %s", err) + } + + if rowsAdded != 1 { + log.Printf("no error came back when data was added to the database but the expected row count is wrong") + } + } + + if sourceIndex != len(sources) { + time.Sleep(time.Second * 30) + } + } + + log.Print("[Twitch] Done!") +} diff --git a/internal/services/cron/rss_test.go b/internal/services/cron/collectors_test.go similarity index 91% rename from internal/services/cron/rss_test.go rename to internal/services/cron/collectors_test.go index 4ab8da7..090d12d 100644 --- a/internal/services/cron/rss_test.go +++ b/internal/services/cron/collectors_test.go @@ -19,7 +19,7 @@ func TestRssPullsCorrectly(t *testing.T) { ctx := context.Background() db := services.NewRepositoryService(conn) - rowsCreated, err := db.Sources.Create(ctx, domain.SourceCollectorRss, "Gitea - Newsbot.api", "https://git.jamestombleson.com/jtom38/newsbot-api.rss", "rss,gitea,newsbot.api",true) + rowsCreated, err := db.Sources.Create(ctx, domain.SourceCollectorRss, "Gitea - Newsbot.api", "https://git.jamestombleson.com/jtom38/newsbot-api.rss", "rss,gitea,newsbot.api", true) if err != nil { t.Error(err) t.FailNow() @@ -31,7 +31,7 @@ func TestRssPullsCorrectly(t *testing.T) { } client := cron.NewScheduler(ctx, conn) - client.CheckRssSources() + client.CollectRssPosts() articles, err := db.Articles.ListByPage(ctx, 0, 100) if err != nil { diff --git a/internal/services/cron/rss.go b/internal/services/cron/rss.go deleted file mode 100644 index fce1a0d..0000000 --- a/internal/services/cron/rss.go +++ /dev/null @@ -1,59 +0,0 @@ -package cron - -import ( - "log" - "time" - - "git.jamestombleson.com/jtom38/newsbot-api/internal/domain" - "git.jamestombleson.com/jtom38/newsbot-api/internal/services/input" -) - -func (c *Cron) CheckRssSources() { - log.Println("Starting ") - sources, err := c.repo.Sources.ListBySource(*c.ctx, 0, 1000, domain.SourceCollectorRss) - if err != nil { - log.Println(err) - } - - for sourceIndex, source := range sources { - if !source.Enabled { - continue - } - - rssClient := input.NewRssClient(source) - articles, err := rssClient.GetArticles() - if err != nil { - log.Println(err) - } - - for _, article := range articles { - _, err := c.repo.Articles.GetByUrl(*c.ctx, article.Url) - if err == nil { - continue - } - - rowsCreated, err := c.repo.Articles.CreateFromEntity(*c.ctx, article) - if err != nil { - log.Println(err) - } - if rowsCreated != 1 { - log.Println("Got back the wrong number of rows") - } - } - - if sourceIndex != len(sources) { - time.Sleep(time.Second * 30) - } - } -} - -func (c *Cron) ListAllSourceRecords(sourceType string) ([]domain.SourceEntity, error) { - var records []domain.SourceEntity - - sources, err := c.repo.Sources.ListBySource(*c.ctx, 0, 1000, sourceType) - if err != nil { - return records, err - } - - return sources, nil -} diff --git a/internal/services/cron/scheduler.go b/internal/services/cron/scheduler.go index f2f978c..594bec4 100644 --- a/internal/services/cron/scheduler.go +++ b/internal/services/cron/scheduler.go @@ -3,102 +3,37 @@ package cron import ( "context" "database/sql" - "fmt" - "log" - "time" - "github.com/google/uuid" _ "github.com/lib/pq" "github.com/robfig/cron/v3" "git.jamestombleson.com/jtom38/newsbot-api/internal/database" "git.jamestombleson.com/jtom38/newsbot-api/internal/services" - "git.jamestombleson.com/jtom38/newsbot-api/internal/services/input" - "git.jamestombleson.com/jtom38/newsbot-api/internal/services/output" ) type Cron struct { Db *database.Queries ctx *context.Context timer *cron.Cron + repo services.RepositoryService } -func openDatabase() (*database.Queries, error) { - _env := services.NewConfig() - connString := _env.GetConfig(services.Sql_Connection_String) - if connString == "" { - panic("Connection String is null!") - } - db, err := sql.Open("postgres", connString) - if err != nil { - panic(err) - } - - queries := database.New(db) - return queries, err -} - -func NewScheduler(ctx context.Context) *Cron { +func NewScheduler(ctx context.Context, conn *sql.DB) *Cron { c := &Cron{ - ctx: &ctx, + ctx: &ctx, + repo: services.NewRepositoryService(conn), } timer := cron.New() - queries, err := openDatabase() - if err != nil { - panic(err) - } - c.Db = queries //timer.AddFunc("*/5 * * * *", func() { go CheckCache() }) //features := services.GetEnvConfig() - features := services.NewConfig() - timer.AddFunc("5 * * * *", c.CheckRssSources) - - //if features.RedditEnabled { - // timer.AddFunc("5 1-23 * * *", func() { go c.CheckReddit() }) - // log.Print("[Input] Reddit backend was enabled") - // //go c.CheckReddit() - //} - res, _ := features.GetFeature(services.FEATURE_ENABLE_REDDIT_BACKEND) - if res { - timer.AddFunc("5 1-23 * * *", func() { go c.CheckReddit() }) - log.Print("[Input] Reddit backend was enabled") - //go c.CheckReddit() - } - - //if features.YoutubeEnabled { - // timer.AddFunc("10 1-23 * * *", func() { go c.CheckYoutube() }) - // log.Print("[Input] YouTube backend was enabled") - //} - res, _ = features.GetFeature(services.FEATURE_ENABLE_YOUTUBE_BACKEND) - if res { - timer.AddFunc("10 1-23 * * *", func() { go c.CheckYoutube() }) - log.Print("[Input] YouTube backend was enabled") - } - - //if features.FfxivEnabled { - // timer.AddFunc("5 5,10,15,20 * * *", func() { go c.CheckFfxiv() }) - // log.Print("[Input] FFXIV backend was enabled") - //} - res, _ = features.GetFeature(services.FEATURE_ENABLE_FFXIV_BACKEND) - if res { - timer.AddFunc("5 5,10,15,20 * * *", func() { go c.CheckFfxiv() }) - log.Print("[Input] FFXIV backend was enabled") - } - - //if features.TwitchEnabled { - // timer.AddFunc("15 1-23 * * *", func() { go c.CheckTwitch() }) - // log.Print("[Input] Twitch backend was enabled") - //} - res, _ = features.GetFeature(services.FEATURE_ENABLE_TWITCH_BACKEND) - if res { - timer.AddFunc("15 1-23 * * *", func() { go c.CheckTwitch() }) - log.Print("[Input] Twitch backend was enabled") - } - - timer.AddFunc("*/5 * * * *", func() { go c.CheckDiscordQueue() }) - log.Print("[Output] Discord Output was enabled") + timer.AddFunc("5 * * * *", c.CollectRssPosts) + //timer.AddFunc("5 1-23 * * *", c.CollectRedditPosts) + //timer.AddFunc("10 1-23 * * *", c.CheckYoutube) + //timer.AddFunc("5 5,10,15,20 * * *", c.CheckFfxiv) + //timer.AddFunc("15 1-23 * * *", c.CheckTwitch) + //timer.AddFunc("*/5 * * * *", c.CheckDiscordQueue) c.timer = timer return c @@ -112,105 +47,8 @@ func (c *Cron) Stop() { c.timer.Stop() } -// This is the main entry point to query all the reddit services -func (c *Cron) CheckReddit() { - sources, err := c.Db.ListSourcesBySource(*c.ctx, "reddit") - if err != nil { - log.Printf("[Reddit] No sources found to query - %v\r", err) - } - - for _, source := range sources { - if !source.Enabled { - continue - } - log.Printf("[Reddit] Checking '%v'...", source.Name) - rc := input.NewRedditClient(source) - raw, err := rc.GetContent() - if err != nil { - log.Println(err) - } - redditArticles := rc.ConvertToArticles(raw) - c.checkPosts(redditArticles, "Reddit") - } - log.Print("[Reddit] Done!") -} - -func (c *Cron) CheckYoutube() { - // Add call to the db to request youtube sources. - sources, err := c.Db.ListSourcesBySource(*c.ctx, "youtube") - if err != nil { - log.Printf("[Youtube] No sources found to query - %v\r", err) - } - - for _, source := range sources { - if !source.Enabled { - continue - } - log.Printf("[YouTube] Checking '%v'...", source.Name) - yc := input.NewYoutubeClient(source) - raw, err := yc.GetContent() - if err != nil { - log.Println(err) - } - c.checkPosts(raw, "YouTube") - } - log.Print("[YouTube] Done!") -} - -func (c *Cron) CheckFfxiv() { - sources, err := c.Db.ListSourcesBySource(*c.ctx, "ffxiv") - if err != nil { - log.Printf("[FFXIV] No sources found to query - %v\r", err) - } - - for _, source := range sources { - if !source.Enabled { - continue - } - fc := input.NewFFXIVClient(source) - items, err := fc.CheckSource() - if err != nil { - log.Println(err) - } - c.checkPosts(items, "FFXIV") - } - log.Printf("[FFXIV Done!]") -} - -func (c *Cron) CheckTwitch() error { - sources, err := c.Db.ListSourcesBySource(*c.ctx, "twitch") - if err != nil { - log.Printf("[Twitch] No sources found to query - %v\r", err) - } - - tc, err := input.NewTwitchClient() - if err != nil { - return err - } - - err = tc.Login() - if err != nil { - return err - } - - for _, source := range sources { - if !source.Enabled { - continue - } - log.Printf("[Twitch] Checking '%v'...", source.Name) - tc.ReplaceSourceRecord(source) - items, err := tc.GetContent() - if err != nil { - log.Println(err) - } - c.checkPosts(items, "Twitch") - } - - log.Print("[Twitch] Done!") - return nil -} - -func (c *Cron) CheckDiscordQueue() error { +/* +func (c *Cron) CheckDiscordQueue() { // Get items from the table queueItems, err := c.Db.ListDiscordQueueItems(*c.ctx, 50) if err != nil { @@ -279,55 +117,15 @@ func (c *Cron) CheckDiscordQueue() error { return nil } +*/ -func (c *Cron) checkPosts(posts []database.Article, sourceName string) error { - for _, item := range posts { - _, err := c.Db.GetArticleByUrl(*c.ctx, item.Url) - if err != nil { - id := uuid.New() - - err := c.postArticle(id, item) - if err != nil { - return fmt.Errorf("[%v] Failed to post article - %v - %v.\r", sourceName, item.Url, err) - } - - err = c.addToDiscordQueue(id) - if err != nil { - return err - } - - } - } - time.Sleep(30 * time.Second) - return nil -} - -func (c *Cron) postArticle(id uuid.UUID, item database.Article) error { - err := c.Db.CreateArticle(*c.ctx, database.CreateArticleParams{ - ID: id, - Sourceid: item.Sourceid, - Tags: item.Tags, - Title: item.Title, - Url: item.Url, - Pubdate: item.Pubdate, - Video: item.Video, - Videoheight: item.Videoheight, - Videowidth: item.Videowidth, - Thumbnail: item.Thumbnail, - Description: item.Description, - Authorname: item.Authorname, - Authorimage: item.Authorimage, - }) - return err -} - -func (c *Cron) addToDiscordQueue(Id uuid.UUID) error { - err := c.Db.CreateDiscordQueue(*c.ctx, database.CreateDiscordQueueParams{ - ID: uuid.New(), - Articleid: Id, - }) - if err != nil { - return err - } - return nil -} +//func (c *Cron) addToDiscordQueue(Id uuid.UUID) error { +// err := c.Db.CreateDiscordQueue(*c.ctx, database.CreateDiscordQueueParams{ +// ID: uuid.New(), +// Articleid: Id, +// }) +// if err != nil { +// return err +// } +// return nil +//} diff --git a/internal/services/cron/scheduler_test.go b/internal/services/cron/scheduler_test.go index a0dda86..8332a5e 100644 --- a/internal/services/cron/scheduler_test.go +++ b/internal/services/cron/scheduler_test.go @@ -1,12 +1,12 @@ package cron_test import ( - "context" - "testing" + "database/sql" - "git.jamestombleson.com/jtom38/newsbot-api/internal/services/cron" + "github.com/pressly/goose/v3" ) +/* func TestInvokeTwitch(t *testing.T) { } @@ -15,7 +15,7 @@ func TestInvokeTwitch(t *testing.T) { func TestCheckReddit(t *testing.T) { ctx := context.Background() c := cron.NewScheduler(ctx) - c.CheckReddit() + c.Col() } func TestCheckYouTube(t *testing.T) { @@ -32,6 +32,7 @@ func TestCheckTwitch(t *testing.T) { t.Error(err) } } +*/ func setupInMemoryDb() (*sql.DB, error) { db, err := sql.Open("sqlite", ":memory:")