broke cron out and moved to the new domain models

This commit is contained in:
James Tombleson 2024-04-29 16:29:54 -07:00
parent e48b64bbaa
commit ce45b509d3
6 changed files with 211 additions and 191 deletions

View File

@ -0,0 +1,30 @@
package cron
import (
"log"
"git.jamestombleson.com/jtom38/newsbot-api/internal/domain"
"git.jamestombleson.com/jtom38/newsbot-api/internal/services/input"
)
func (c *Cron) CheckFfxiv() {
sources, err := c.repo.Sources.ListBySource(*c.ctx, 0, 10, domain.SourceCollectorFfxiv)
if err != nil {
log.Printf("[FFXIV] No sources found to query - %v\r", err)
}
for _, source := range sources {
if !source.Enabled {
continue
}
fc := input.NewFFXIVClient(source)
items, err := fc.CheckSource()
if err != nil {
log.Println(err)
}
c.SaveNewArticles(items, domain.SourceCollectorFfxiv)
}
log.Printf("[FFXIV Done!]")
}

View File

@ -0,0 +1,33 @@
package cron
import (
"log"
"git.jamestombleson.com/jtom38/newsbot-api/internal/domain"
"git.jamestombleson.com/jtom38/newsbot-api/internal/services/input"
)
// This is the main entry point to query all the reddit services
func (c *Cron) CheckReddit() {
sources, err := c.repo.Sources.ListBySource(*c.ctx, 0, 100, domain.SourceCollectorReddit)
if err != nil {
log.Printf("[Reddit] No sources found to query - %v\r", err)
}
for _, source := range sources {
if !source.Enabled {
continue
}
log.Printf("[Reddit] Checking '%v'...", source.DisplayName)
rc := input.NewRedditClient(source)
raw, err := rc.GetContent()
if err != nil {
log.Println(err)
}
redditArticles := rc.ConvertToArticles(raw)
c.SaveNewArticles(redditArticles, domain.SourceCollectorReddit)
}
log.Print("[Reddit] Done!")
}

View File

@ -3,83 +3,56 @@ package cron
import ( import (
"context" "context"
"database/sql" "database/sql"
"fmt" "errors"
"log" "log"
"time"
"github.com/google/uuid"
_ "github.com/lib/pq" _ "github.com/lib/pq"
"github.com/robfig/cron/v3" "github.com/robfig/cron/v3"
"git.jamestombleson.com/jtom38/newsbot-api/internal/database" "git.jamestombleson.com/jtom38/newsbot-api/internal/domain"
"git.jamestombleson.com/jtom38/newsbot-api/internal/services" "git.jamestombleson.com/jtom38/newsbot-api/internal/services"
"git.jamestombleson.com/jtom38/newsbot-api/internal/services/input"
"git.jamestombleson.com/jtom38/newsbot-api/internal/services/output"
) )
type Cron struct { type Cron struct {
Db *database.Queries
ctx *context.Context ctx *context.Context
timer *cron.Cron timer *cron.Cron
repo services.RepositoryService
} }
func openDatabase() (*database.Queries, error) { func NewScheduler(ctx context.Context, conn *sql.DB) *Cron {
_env := services.NewConfig()
connString := _env.GetConfig(services.Sql_Connection_String)
if connString == "" {
panic("Connection String is null!")
}
db, err := sql.Open("postgres", connString)
if err != nil {
panic(err)
}
queries := database.New(db)
return queries, err
}
func NewScheduler(ctx context.Context) *Cron {
c := &Cron{ c := &Cron{
ctx: &ctx, ctx: &ctx,
repo: services.NewRepositoryService(conn),
} }
timer := cron.New() timer := cron.New()
queries, err := openDatabase()
if err != nil {
panic(err)
}
c.Db = queries
//timer.AddFunc("*/5 * * * *", func() { go CheckCache() }) //timer.AddFunc("*/5 * * * *", func() { go CheckCache() })
features := services.NewConfig() features := services.GetEnvConfig()
res, _ := features.GetFeature(services.FEATURE_ENABLE_REDDIT_BACKEND) if features.RedditEnabled {
if res {
timer.AddFunc("5 1-23 * * *", func() { go c.CheckReddit() }) timer.AddFunc("5 1-23 * * *", func() { go c.CheckReddit() })
log.Print("[Input] Reddit backend was enabled") log.Print("[Input] Reddit backend was enabled")
//go c.CheckReddit() //go c.CheckReddit()
} }
res, _ = features.GetFeature(services.FEATURE_ENABLE_YOUTUBE_BACKEND) if features.YoutubeEnabled {
if res {
timer.AddFunc("10 1-23 * * *", func() { go c.CheckYoutube() }) timer.AddFunc("10 1-23 * * *", func() { go c.CheckYoutube() })
log.Print("[Input] YouTube backend was enabled") log.Print("[Input] YouTube backend was enabled")
} }
res, _ = features.GetFeature(services.FEATURE_ENABLE_FFXIV_BACKEND) if features.FfxivEnabled {
if res {
timer.AddFunc("5 5,10,15,20 * * *", func() { go c.CheckFfxiv() }) timer.AddFunc("5 5,10,15,20 * * *", func() { go c.CheckFfxiv() })
log.Print("[Input] FFXIV backend was enabled") log.Print("[Input] FFXIV backend was enabled")
} }
res, _ = features.GetFeature(services.FEATURE_ENABLE_TWITCH_BACKEND) if features.TwitchEnabled {
if res {
timer.AddFunc("15 1-23 * * *", func() { go c.CheckTwitch() }) timer.AddFunc("15 1-23 * * *", func() { go c.CheckTwitch() })
log.Print("[Input] Twitch backend was enabled") log.Print("[Input] Twitch backend was enabled")
} }
timer.AddFunc("*/5 * * * *", func() { go c.CheckDiscordQueue() }) //timer.AddFunc("*/5 * * * *", func() { go c.CheckDiscordQueue() })
log.Print("[Output] Discord Output was enabled") //log.Print("[Output] Discord Output was enabled")
c.timer = timer c.timer = timer
return c return c
@ -93,104 +66,7 @@ func (c *Cron) Stop() {
c.timer.Stop() c.timer.Stop()
} }
// This is the main entry point to query all the reddit services /* TODO move to the sqlite queue
func (c *Cron) CheckReddit() {
sources, err := c.Db.ListSourcesBySource(*c.ctx, "reddit")
if err != nil {
log.Printf("[Reddit] No sources found to query - %v\r", err)
}
for _, source := range sources {
if !source.Enabled {
continue
}
log.Printf("[Reddit] Checking '%v'...", source.Name)
rc := input.NewRedditClient(source)
raw, err := rc.GetContent()
if err != nil {
log.Println(err)
}
redditArticles := rc.ConvertToArticles(raw)
c.checkPosts(redditArticles, "Reddit")
}
log.Print("[Reddit] Done!")
}
func (c *Cron) CheckYoutube() {
// Add call to the db to request youtube sources.
sources, err := c.Db.ListSourcesBySource(*c.ctx, "youtube")
if err != nil {
log.Printf("[Youtube] No sources found to query - %v\r", err)
}
for _, source := range sources {
if !source.Enabled {
continue
}
log.Printf("[YouTube] Checking '%v'...", source.Name)
yc := input.NewYoutubeClient(source)
raw, err := yc.GetContent()
if err != nil {
log.Println(err)
}
c.checkPosts(raw, "YouTube")
}
log.Print("[YouTube] Done!")
}
func (c *Cron) CheckFfxiv() {
sources, err := c.Db.ListSourcesBySource(*c.ctx, "ffxiv")
if err != nil {
log.Printf("[FFXIV] No sources found to query - %v\r", err)
}
for _, source := range sources {
if !source.Enabled {
continue
}
fc := input.NewFFXIVClient(source)
items, err := fc.CheckSource()
if err != nil {
log.Println(err)
}
c.checkPosts(items, "FFXIV")
}
log.Printf("[FFXIV Done!]")
}
func (c *Cron) CheckTwitch() error {
sources, err := c.Db.ListSourcesBySource(*c.ctx, "twitch")
if err != nil {
log.Printf("[Twitch] No sources found to query - %v\r", err)
}
tc, err := input.NewTwitchClient()
if err != nil {
return err
}
err = tc.Login()
if err != nil {
return err
}
for _, source := range sources {
if !source.Enabled {
continue
}
log.Printf("[Twitch] Checking '%v'...", source.Name)
tc.ReplaceSourceRecord(source)
items, err := tc.GetContent()
if err != nil {
log.Println(err)
}
c.checkPosts(items, "Twitch")
}
log.Print("[Twitch] Done!")
return nil
}
func (c *Cron) CheckDiscordQueue() error { func (c *Cron) CheckDiscordQueue() error {
// Get items from the table // Get items from the table
queueItems, err := c.Db.ListDiscordQueueItems(*c.ctx, 50) queueItems, err := c.Db.ListDiscordQueueItems(*c.ctx, 50)
@ -260,55 +136,26 @@ func (c *Cron) CheckDiscordQueue() error {
return nil return nil
} }
*/
func (c *Cron) checkPosts(posts []database.Article, sourceName string) error { func (c Cron) SaveNewArticles(posts []domain.ArticleEntity, sourceName string) error {
for _, item := range posts { for _, item := range posts {
_, err := c.Db.GetArticleByUrl(*c.ctx, item.Url) _, err := c.repo.Articles.GetByUrl(*c.ctx, item.Url)
if err == nil {
// This url is already known, so skip it
continue
}
// Load the new article in the repository
rows, err := c.repo.Articles.Create(*c.ctx, item.SourceID, item.Tags, item.Title, item.Url, item.Thumbnail, item.Description, item.AuthorName, item.AuthorImageUrl, item.PubDate, item.IsVideo)
if err != nil { if err != nil {
id := uuid.New() return err
}
err := c.postArticle(id, item)
if err != nil {
return fmt.Errorf("[%v] Failed to post article - %v - %v.\r", sourceName, item.Url, err)
}
err = c.addToDiscordQueue(id)
if err != nil {
return err
}
if rows != 1 {
return errors.New("failed to create a new record for some reason")
} }
} }
time.Sleep(30 * time.Second)
return nil
}
func (c *Cron) postArticle(id uuid.UUID, item database.Article) error {
err := c.Db.CreateArticle(*c.ctx, database.CreateArticleParams{
ID: id,
Sourceid: item.Sourceid,
Tags: item.Tags,
Title: item.Title,
Url: item.Url,
Pubdate: item.Pubdate,
Video: item.Video,
Videoheight: item.Videoheight,
Videowidth: item.Videowidth,
Thumbnail: item.Thumbnail,
Description: item.Description,
Authorname: item.Authorname,
Authorimage: item.Authorimage,
})
return err
}
func (c *Cron) addToDiscordQueue(Id uuid.UUID) error {
err := c.Db.CreateDiscordQueue(*c.ctx, database.CreateDiscordQueueParams{
ID: uuid.New(),
Articleid: Id,
})
if err != nil {
return err
}
return nil return nil
} }

View File

@ -2,33 +2,70 @@ package cron_test
import ( import (
"context" "context"
"database/sql"
"testing" "testing"
"git.jamestombleson.com/jtom38/newsbot-api/internal/services/cron" "git.jamestombleson.com/jtom38/newsbot-api/internal/services/cron"
_ "github.com/glebarez/go-sqlite"
"github.com/pressly/goose/v3"
) )
func TestInvokeTwitch(t *testing.T) {
}
// TODO add database mocks but not sure how to do that yet.
func TestCheckReddit(t *testing.T) { func TestCheckReddit(t *testing.T) {
db, err := setupInMemoryDb()
if err != nil {
t.Log(err)
t.FailNow()
}
defer db.Close()
ctx := context.Background() ctx := context.Background()
c := cron.NewScheduler(ctx) c := cron.NewScheduler(ctx, db)
c.CheckReddit() c.CheckReddit()
} }
func TestCheckYouTube(t *testing.T) { func TestCheckYouTube(t *testing.T) {
db, err := setupInMemoryDb()
if err != nil {
t.Log(err)
t.FailNow()
}
defer db.Close()
ctx := context.Background() ctx := context.Background()
c := cron.NewScheduler(ctx) c := cron.NewScheduler(ctx, db)
c.CheckYoutube() c.CheckYoutube()
} }
func TestCheckTwitch(t *testing.T) { func TestCheckTwitch(t *testing.T) {
db, err := setupInMemoryDb()
if err != nil {
t.Log(err)
t.FailNow()
}
defer db.Close()
ctx := context.Background() ctx := context.Background()
c := cron.NewScheduler(ctx) c := cron.NewScheduler(ctx, db)
err := c.CheckTwitch() err = c.CheckTwitch()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
} }
func setupInMemoryDb() (*sql.DB, error) {
db, err := sql.Open("sqlite", ":memory:")
if err != nil {
return nil, err
}
err = goose.SetDialect("sqlite3")
if err != nil {
return nil, err
}
err = goose.Up(db, "../database/migrations")
if err != nil {
return nil, err
}
return db, nil
}

View File

@ -0,0 +1,41 @@
package cron
import (
"log"
"git.jamestombleson.com/jtom38/newsbot-api/internal/domain"
"git.jamestombleson.com/jtom38/newsbot-api/internal/services/input"
)
func (c *Cron) CheckTwitch() error {
sources, err := c.repo.Sources.ListBySource(*c.ctx, 0, 100, domain.SourceCollectorTwitch)
if err != nil {
log.Printf("[Twitch] No sources found to query - %v\r", err)
}
tc, err := input.NewTwitchClient()
if err != nil {
return err
}
err = tc.Login()
if err != nil {
return err
}
for _, source := range sources {
if !source.Enabled {
continue
}
log.Printf("[Twitch] Checking '%v'...", source.DisplayName)
tc.ReplaceSourceRecord(source)
items, err := tc.GetContent()
if err != nil {
log.Println(err)
}
c.SaveNewArticles(items, domain.SourceCollectorTwitch)
}
log.Print("[Twitch] Done!")
return nil
}

View File

@ -0,0 +1,32 @@
package cron
import (
"log"
"git.jamestombleson.com/jtom38/newsbot-api/internal/domain"
"git.jamestombleson.com/jtom38/newsbot-api/internal/services/input"
)
func (c *Cron) CheckYoutube() {
// Add call to the db to request youtube sources.
sources, err := c.repo.Sources.ListBySource(*c.ctx, 0, 100, domain.SourceCollectorYoutube)
if err != nil {
log.Printf("[Youtube] No sources found to query - %v\r", err)
}
for _, source := range sources {
if !source.Enabled {
continue
}
log.Printf("[YouTube] Checking '%v'...", source.DisplayName)
yc := input.NewYoutubeClient(source)
raw, err := yc.GetContent()
if err != nil {
log.Println(err)
}
c.SaveNewArticles(raw, domain.SourceCollectorYoutube)
}
log.Print("[YouTube] Done!")
}