cron is now cleaned up and only running rss for now

This commit is contained in:
James Tombleson 2024-05-01 18:26:32 -07:00
parent 77b5c2bfa4
commit ddf50077b5
5 changed files with 250 additions and 290 deletions

View File

@ -0,0 +1,220 @@
package cron
import (
"log"
"time"
"git.jamestombleson.com/jtom38/newsbot-api/internal/domain"
"git.jamestombleson.com/jtom38/newsbot-api/internal/services/input"
)
func (c *Cron) CollectRssPosts() {
log.Println("Starting ")
sources, err := c.repo.Sources.ListBySource(*c.ctx, 0, 1000, domain.SourceCollectorRss)
if err != nil {
log.Println(err)
}
for sourceIndex, source := range sources {
if !source.Enabled {
continue
}
rssClient := input.NewRssClient(source)
articles, err := rssClient.GetArticles()
if err != nil {
log.Println(err)
}
for _, article := range articles {
_, err := c.repo.Articles.GetByUrl(*c.ctx, article.Url)
if err == nil {
continue
}
rowsCreated, err := c.repo.Articles.CreateFromEntity(*c.ctx, article)
if err != nil {
log.Println(err)
}
if rowsCreated != 1 {
log.Println("Got back the wrong number of rows")
}
}
if sourceIndex != len(sources) {
time.Sleep(time.Second * 30)
}
}
}
func (c *Cron) CollectRedditPosts() {
sources, err := c.repo.Sources.ListBySource(*c.ctx, 0, 1000, domain.SourceCollectorReddit)
if err != nil {
log.Printf("[Reddit] No sources found to query - %v\r", err)
}
for _, source := range sources {
if !source.Enabled {
continue
}
log.Printf("[Reddit] Checking '%v'...", source.DisplayName)
rc := input.NewRedditClient(source)
raw, err := rc.GetContent()
if err != nil {
log.Println(err)
}
redditArticles := rc.ConvertToArticles(raw)
for _, article := range redditArticles {
_, err := c.repo.Articles.GetByUrl(*c.ctx, article.Url)
if err == nil {
continue
}
rowsAdded, err := c.repo.Articles.CreateFromEntity(*c.ctx, article)
if err != nil {
log.Printf("Failed to add a new reddit article to the database: %s", err)
}
if rowsAdded != 1 {
log.Printf("no error came back when data was added to the database but the expected row count is wrong")
}
}
}
log.Print("[Reddit] Done!")
}
func (c *Cron) CollectYoutubePosts() {
sources, err := c.repo.Sources.ListBySource(*c.ctx, 0, 1000, domain.SourceCollectorYoutube)
if err != nil {
log.Printf("[Youtube] No sources found to query - %v\r", err)
}
for sourceIndex, source := range sources {
if !source.Enabled {
continue
}
log.Printf("[YouTube] Checking '%v'...", source.DisplayName)
yc := input.NewYoutubeClient(source)
raw, err := yc.GetContent()
if err != nil {
log.Println(err)
}
for _, article := range raw {
_, err := c.repo.Articles.GetByUrl(*c.ctx, article.Url)
if err == nil {
continue
}
rowsAdded, err := c.repo.Articles.CreateFromEntity(*c.ctx, article)
if err != nil {
log.Printf("Failed to add a new youtube article to the database: %s", err)
}
if rowsAdded != 1 {
log.Printf("no error came back when data was added to the database but the expected row count is wrong")
}
}
if sourceIndex != len(sources) {
time.Sleep(time.Second * 30)
}
}
log.Print("[YouTube] Done!")
}
func (c *Cron) CollectFfxivPosts() {
sources, err := c.repo.Sources.ListBySource(*c.ctx, 0, 1000, domain.SourceCollectorFfxiv)
if err != nil {
log.Printf("[FFXIV] No sources found to query - %v\r", err)
}
for sourceIndex, source := range sources {
if !source.Enabled {
continue
}
fc := input.NewFFXIVClient(source)
items, err := fc.CheckSource()
if err != nil {
log.Println(err)
}
for _, article := range items {
_, err := c.repo.Articles.GetByUrl(*c.ctx, article.Url)
if err == nil {
continue
}
rowsAdded, err := c.repo.Articles.CreateFromEntity(*c.ctx, article)
if err != nil {
log.Printf("Failed to add a new FFXIV article to the database: %s", err)
}
if rowsAdded != 1 {
log.Printf("no error came back when data was added to the database but the expected row count is wrong")
}
}
if sourceIndex != len(sources) {
time.Sleep(time.Second * 30)
}
}
log.Printf("[FFXIV Done!]")
}
func (c *Cron) CollectTwitchPosts() {
sources, err := c.repo.Sources.ListBySource(*c.ctx, 0, 1000, domain.SourceCollectorTwitch)
if err != nil {
log.Printf("[Twitch] No sources found to query - %v\r", err)
}
tc, err := input.NewTwitchClient()
if err != nil {
log.Println(err)
return
}
err = tc.Login()
if err != nil {
log.Println(err)
}
for sourceIndex, source := range sources {
if !source.Enabled {
continue
}
log.Printf("[Twitch] Checking '%v'...", source.DisplayName)
tc.ReplaceSourceRecord(source)
items, err := tc.GetContent()
if err != nil {
log.Println(err)
}
for _, article := range items {
_, err := c.repo.Articles.GetByUrl(*c.ctx, article.Url)
if err == nil {
continue
}
rowsAdded, err := c.repo.Articles.CreateFromEntity(*c.ctx, article)
if err != nil {
log.Printf("Failed to add a new Twitch article to the database: %s", err)
}
if rowsAdded != 1 {
log.Printf("no error came back when data was added to the database but the expected row count is wrong")
}
}
if sourceIndex != len(sources) {
time.Sleep(time.Second * 30)
}
}
log.Print("[Twitch] Done!")
}

View File

@ -31,7 +31,7 @@ func TestRssPullsCorrectly(t *testing.T) {
}
client := cron.NewScheduler(ctx, conn)
client.CheckRssSources()
client.CollectRssPosts()
articles, err := db.Articles.ListByPage(ctx, 0, 100)
if err != nil {

View File

@ -1,59 +0,0 @@
package cron
import (
"log"
"time"
"git.jamestombleson.com/jtom38/newsbot-api/internal/domain"
"git.jamestombleson.com/jtom38/newsbot-api/internal/services/input"
)
func (c *Cron) CheckRssSources() {
log.Println("Starting ")
sources, err := c.repo.Sources.ListBySource(*c.ctx, 0, 1000, domain.SourceCollectorRss)
if err != nil {
log.Println(err)
}
for sourceIndex, source := range sources {
if !source.Enabled {
continue
}
rssClient := input.NewRssClient(source)
articles, err := rssClient.GetArticles()
if err != nil {
log.Println(err)
}
for _, article := range articles {
_, err := c.repo.Articles.GetByUrl(*c.ctx, article.Url)
if err == nil {
continue
}
rowsCreated, err := c.repo.Articles.CreateFromEntity(*c.ctx, article)
if err != nil {
log.Println(err)
}
if rowsCreated != 1 {
log.Println("Got back the wrong number of rows")
}
}
if sourceIndex != len(sources) {
time.Sleep(time.Second * 30)
}
}
}
func (c *Cron) ListAllSourceRecords(sourceType string) ([]domain.SourceEntity, error) {
var records []domain.SourceEntity
sources, err := c.repo.Sources.ListBySource(*c.ctx, 0, 1000, sourceType)
if err != nil {
return records, err
}
return sources, nil
}

View File

@ -3,102 +3,37 @@ package cron
import (
"context"
"database/sql"
"fmt"
"log"
"time"
"github.com/google/uuid"
_ "github.com/lib/pq"
"github.com/robfig/cron/v3"
"git.jamestombleson.com/jtom38/newsbot-api/internal/database"
"git.jamestombleson.com/jtom38/newsbot-api/internal/services"
"git.jamestombleson.com/jtom38/newsbot-api/internal/services/input"
"git.jamestombleson.com/jtom38/newsbot-api/internal/services/output"
)
type Cron struct {
Db *database.Queries
ctx *context.Context
timer *cron.Cron
repo services.RepositoryService
}
func openDatabase() (*database.Queries, error) {
_env := services.NewConfig()
connString := _env.GetConfig(services.Sql_Connection_String)
if connString == "" {
panic("Connection String is null!")
}
db, err := sql.Open("postgres", connString)
if err != nil {
panic(err)
}
queries := database.New(db)
return queries, err
}
func NewScheduler(ctx context.Context) *Cron {
func NewScheduler(ctx context.Context, conn *sql.DB) *Cron {
c := &Cron{
ctx: &ctx,
repo: services.NewRepositoryService(conn),
}
timer := cron.New()
queries, err := openDatabase()
if err != nil {
panic(err)
}
c.Db = queries
//timer.AddFunc("*/5 * * * *", func() { go CheckCache() })
//features := services.GetEnvConfig()
features := services.NewConfig()
timer.AddFunc("5 * * * *", c.CheckRssSources)
//if features.RedditEnabled {
// timer.AddFunc("5 1-23 * * *", func() { go c.CheckReddit() })
// log.Print("[Input] Reddit backend was enabled")
// //go c.CheckReddit()
//}
res, _ := features.GetFeature(services.FEATURE_ENABLE_REDDIT_BACKEND)
if res {
timer.AddFunc("5 1-23 * * *", func() { go c.CheckReddit() })
log.Print("[Input] Reddit backend was enabled")
//go c.CheckReddit()
}
//if features.YoutubeEnabled {
// timer.AddFunc("10 1-23 * * *", func() { go c.CheckYoutube() })
// log.Print("[Input] YouTube backend was enabled")
//}
res, _ = features.GetFeature(services.FEATURE_ENABLE_YOUTUBE_BACKEND)
if res {
timer.AddFunc("10 1-23 * * *", func() { go c.CheckYoutube() })
log.Print("[Input] YouTube backend was enabled")
}
//if features.FfxivEnabled {
// timer.AddFunc("5 5,10,15,20 * * *", func() { go c.CheckFfxiv() })
// log.Print("[Input] FFXIV backend was enabled")
//}
res, _ = features.GetFeature(services.FEATURE_ENABLE_FFXIV_BACKEND)
if res {
timer.AddFunc("5 5,10,15,20 * * *", func() { go c.CheckFfxiv() })
log.Print("[Input] FFXIV backend was enabled")
}
//if features.TwitchEnabled {
// timer.AddFunc("15 1-23 * * *", func() { go c.CheckTwitch() })
// log.Print("[Input] Twitch backend was enabled")
//}
res, _ = features.GetFeature(services.FEATURE_ENABLE_TWITCH_BACKEND)
if res {
timer.AddFunc("15 1-23 * * *", func() { go c.CheckTwitch() })
log.Print("[Input] Twitch backend was enabled")
}
timer.AddFunc("*/5 * * * *", func() { go c.CheckDiscordQueue() })
log.Print("[Output] Discord Output was enabled")
timer.AddFunc("5 * * * *", c.CollectRssPosts)
//timer.AddFunc("5 1-23 * * *", c.CollectRedditPosts)
//timer.AddFunc("10 1-23 * * *", c.CheckYoutube)
//timer.AddFunc("5 5,10,15,20 * * *", c.CheckFfxiv)
//timer.AddFunc("15 1-23 * * *", c.CheckTwitch)
//timer.AddFunc("*/5 * * * *", c.CheckDiscordQueue)
c.timer = timer
return c
@ -112,105 +47,8 @@ func (c *Cron) Stop() {
c.timer.Stop()
}
// This is the main entry point to query all the reddit services
func (c *Cron) CheckReddit() {
sources, err := c.Db.ListSourcesBySource(*c.ctx, "reddit")
if err != nil {
log.Printf("[Reddit] No sources found to query - %v\r", err)
}
for _, source := range sources {
if !source.Enabled {
continue
}
log.Printf("[Reddit] Checking '%v'...", source.Name)
rc := input.NewRedditClient(source)
raw, err := rc.GetContent()
if err != nil {
log.Println(err)
}
redditArticles := rc.ConvertToArticles(raw)
c.checkPosts(redditArticles, "Reddit")
}
log.Print("[Reddit] Done!")
}
func (c *Cron) CheckYoutube() {
// Add call to the db to request youtube sources.
sources, err := c.Db.ListSourcesBySource(*c.ctx, "youtube")
if err != nil {
log.Printf("[Youtube] No sources found to query - %v\r", err)
}
for _, source := range sources {
if !source.Enabled {
continue
}
log.Printf("[YouTube] Checking '%v'...", source.Name)
yc := input.NewYoutubeClient(source)
raw, err := yc.GetContent()
if err != nil {
log.Println(err)
}
c.checkPosts(raw, "YouTube")
}
log.Print("[YouTube] Done!")
}
func (c *Cron) CheckFfxiv() {
sources, err := c.Db.ListSourcesBySource(*c.ctx, "ffxiv")
if err != nil {
log.Printf("[FFXIV] No sources found to query - %v\r", err)
}
for _, source := range sources {
if !source.Enabled {
continue
}
fc := input.NewFFXIVClient(source)
items, err := fc.CheckSource()
if err != nil {
log.Println(err)
}
c.checkPosts(items, "FFXIV")
}
log.Printf("[FFXIV Done!]")
}
func (c *Cron) CheckTwitch() error {
sources, err := c.Db.ListSourcesBySource(*c.ctx, "twitch")
if err != nil {
log.Printf("[Twitch] No sources found to query - %v\r", err)
}
tc, err := input.NewTwitchClient()
if err != nil {
return err
}
err = tc.Login()
if err != nil {
return err
}
for _, source := range sources {
if !source.Enabled {
continue
}
log.Printf("[Twitch] Checking '%v'...", source.Name)
tc.ReplaceSourceRecord(source)
items, err := tc.GetContent()
if err != nil {
log.Println(err)
}
c.checkPosts(items, "Twitch")
}
log.Print("[Twitch] Done!")
return nil
}
func (c *Cron) CheckDiscordQueue() error {
/*
func (c *Cron) CheckDiscordQueue() {
// Get items from the table
queueItems, err := c.Db.ListDiscordQueueItems(*c.ctx, 50)
if err != nil {
@ -279,55 +117,15 @@ func (c *Cron) CheckDiscordQueue() error {
return nil
}
*/
func (c *Cron) checkPosts(posts []database.Article, sourceName string) error {
for _, item := range posts {
_, err := c.Db.GetArticleByUrl(*c.ctx, item.Url)
if err != nil {
id := uuid.New()
err := c.postArticle(id, item)
if err != nil {
return fmt.Errorf("[%v] Failed to post article - %v - %v.\r", sourceName, item.Url, err)
}
err = c.addToDiscordQueue(id)
if err != nil {
return err
}
}
}
time.Sleep(30 * time.Second)
return nil
}
func (c *Cron) postArticle(id uuid.UUID, item database.Article) error {
err := c.Db.CreateArticle(*c.ctx, database.CreateArticleParams{
ID: id,
Sourceid: item.Sourceid,
Tags: item.Tags,
Title: item.Title,
Url: item.Url,
Pubdate: item.Pubdate,
Video: item.Video,
Videoheight: item.Videoheight,
Videowidth: item.Videowidth,
Thumbnail: item.Thumbnail,
Description: item.Description,
Authorname: item.Authorname,
Authorimage: item.Authorimage,
})
return err
}
func (c *Cron) addToDiscordQueue(Id uuid.UUID) error {
err := c.Db.CreateDiscordQueue(*c.ctx, database.CreateDiscordQueueParams{
ID: uuid.New(),
Articleid: Id,
})
if err != nil {
return err
}
return nil
}
//func (c *Cron) addToDiscordQueue(Id uuid.UUID) error {
// err := c.Db.CreateDiscordQueue(*c.ctx, database.CreateDiscordQueueParams{
// ID: uuid.New(),
// Articleid: Id,
// })
// if err != nil {
// return err
// }
// return nil
//}

View File

@ -1,12 +1,12 @@
package cron_test
import (
"context"
"testing"
"database/sql"
"git.jamestombleson.com/jtom38/newsbot-api/internal/services/cron"
"github.com/pressly/goose/v3"
)
/*
func TestInvokeTwitch(t *testing.T) {
}
@ -15,7 +15,7 @@ func TestInvokeTwitch(t *testing.T) {
func TestCheckReddit(t *testing.T) {
ctx := context.Background()
c := cron.NewScheduler(ctx)
c.CheckReddit()
c.Col()
}
func TestCheckYouTube(t *testing.T) {
@ -32,6 +32,7 @@ func TestCheckTwitch(t *testing.T) {
t.Error(err)
}
}
*/
func setupInMemoryDb() (*sql.DB, error) {
db, err := sql.Open("sqlite", ":memory:")