features/rss-job #6
@ -24,6 +24,7 @@ type ArticlesRepo interface {
|
|||||||
ListByPublishDate(ctx context.Context, page, limit int, orderBy string) ([]domain.ArticleEntity, error)
|
ListByPublishDate(ctx context.Context, page, limit int, orderBy string) ([]domain.ArticleEntity, error)
|
||||||
ListBySource(ctx context.Context, page, limit, sourceId int, orderBy string) ([]domain.ArticleEntity, error)
|
ListBySource(ctx context.Context, page, limit, sourceId int, orderBy string) ([]domain.ArticleEntity, error)
|
||||||
Create(ctx context.Context, sourceId int64, tags, title, url, thumbnailUrl, description, authorName, authorImageUrl string, pubDate time.Time, isVideo bool) (int64, error)
|
Create(ctx context.Context, sourceId int64, tags, title, url, thumbnailUrl, description, authorName, authorImageUrl string, pubDate time.Time, isVideo bool) (int64, error)
|
||||||
|
CreateFromEntity(ctx context.Context, entity domain.ArticleEntity) (int64, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type ArticleRepository struct {
|
type ArticleRepository struct {
|
||||||
@ -192,6 +193,22 @@ func (ar ArticleRepository) Create(ctx context.Context, sourceId int64, tags, ti
|
|||||||
return 1, nil
|
return 1, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ar ArticleRepository) CreateFromEntity(ctx context.Context, entity domain.ArticleEntity) (int64, error) {
|
||||||
|
dt := time.Now()
|
||||||
|
queryBuilder := sqlbuilder.NewInsertBuilder()
|
||||||
|
queryBuilder.InsertInto("articles")
|
||||||
|
queryBuilder.Cols("UpdatedAt", "CreatedAt", "DeletedAt", "SourceId", "Tags", "Title", "Url", "PubDate", "IsVideo", "ThumbnailUrl", "Description", "AuthorName", "AuthorImageUrl")
|
||||||
|
queryBuilder.Values(dt, dt, timeZero, entity.SourceID, entity.Tags, entity.Title, entity.Url, entity.PubDate, entity.IsVideo, entity.Thumbnail, entity.Description, entity.AuthorName, entity.AuthorImageUrl)
|
||||||
|
query, args := queryBuilder.Build()
|
||||||
|
|
||||||
|
_, err := ar.conn.ExecContext(ctx, query, args...)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return 1, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (ur ArticleRepository) processRows(rows *sql.Rows) []domain.ArticleEntity {
|
func (ur ArticleRepository) processRows(rows *sql.Rows) []domain.ArticleEntity {
|
||||||
items := []domain.ArticleEntity{}
|
items := []domain.ArticleEntity{}
|
||||||
|
|
||||||
|
59
internal/services/cron/rss.go
Normal file
59
internal/services/cron/rss.go
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
package cron
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.jamestombleson.com/jtom38/newsbot-api/internal/domain"
|
||||||
|
"git.jamestombleson.com/jtom38/newsbot-api/internal/services/input"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (c *Cron) CheckRssSources() {
|
||||||
|
log.Println("Starting ")
|
||||||
|
sources, err := c.repo.Sources.ListBySource(*c.ctx, 0, 1000, domain.SourceCollectorRss)
|
||||||
|
if err != nil {
|
||||||
|
log.Println(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for sourceIndex, source := range sources {
|
||||||
|
if !source.Enabled {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
rssClient := input.NewRssClient(source)
|
||||||
|
articles, err := rssClient.GetArticles()
|
||||||
|
if err != nil {
|
||||||
|
log.Println(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, article := range articles {
|
||||||
|
_, err := c.repo.Articles.GetByUrl(*c.ctx, article.Url)
|
||||||
|
if err == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
rowsCreated, err := c.repo.Articles.CreateFromEntity(*c.ctx, article)
|
||||||
|
if err != nil {
|
||||||
|
log.Println(err)
|
||||||
|
}
|
||||||
|
if rowsCreated != 1 {
|
||||||
|
log.Println("Got back the wrong number of rows")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if sourceIndex != len(sources) {
|
||||||
|
time.Sleep(time.Second * 30)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cron) ListAllSourceRecords(sourceType string) ([]domain.SourceEntity, error) {
|
||||||
|
var records []domain.SourceEntity
|
||||||
|
|
||||||
|
sources, err := c.repo.Sources.ListBySource(*c.ctx, 0, 1000, sourceType)
|
||||||
|
if err != nil {
|
||||||
|
return records, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return sources, nil
|
||||||
|
}
|
43
internal/services/cron/rss_test.go
Normal file
43
internal/services/cron/rss_test.go
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
package cron_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.jamestombleson.com/jtom38/newsbot-api/internal/domain"
|
||||||
|
"git.jamestombleson.com/jtom38/newsbot-api/internal/services"
|
||||||
|
"git.jamestombleson.com/jtom38/newsbot-api/internal/services/cron"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRssPullsCorrectly(t *testing.T) {
|
||||||
|
conn, err := setupInMemoryDb()
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
t.FailNow()
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
db := services.NewRepositoryService(conn)
|
||||||
|
rowsCreated, err := db.Sources.Create(ctx, domain.SourceCollectorRss, "Gitea - Newsbot.api", "https://git.jamestombleson.com/jtom38/newsbot-api.rss", "rss,gitea,newsbot.api",true)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
t.FailNow()
|
||||||
|
}
|
||||||
|
|
||||||
|
if rowsCreated != 1 {
|
||||||
|
t.Error("failed to create the source record")
|
||||||
|
t.FailNow()
|
||||||
|
}
|
||||||
|
|
||||||
|
client := cron.NewScheduler(ctx, conn)
|
||||||
|
client.CheckRssSources()
|
||||||
|
|
||||||
|
articles, err := db.Articles.ListByPage(ctx, 0, 100)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
t.FailNow()
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Log(len(articles))
|
||||||
|
}
|
@ -42,7 +42,6 @@ func NewScheduler(ctx context.Context) *Cron {
|
|||||||
c := &Cron{
|
c := &Cron{
|
||||||
ctx: &ctx,
|
ctx: &ctx,
|
||||||
}
|
}
|
||||||
|
|
||||||
timer := cron.New()
|
timer := cron.New()
|
||||||
queries, err := openDatabase()
|
queries, err := openDatabase()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -51,8 +50,16 @@ func NewScheduler(ctx context.Context) *Cron {
|
|||||||
c.Db = queries
|
c.Db = queries
|
||||||
|
|
||||||
//timer.AddFunc("*/5 * * * *", func() { go CheckCache() })
|
//timer.AddFunc("*/5 * * * *", func() { go CheckCache() })
|
||||||
|
//features := services.GetEnvConfig()
|
||||||
features := services.NewConfig()
|
features := services.NewConfig()
|
||||||
|
|
||||||
|
timer.AddFunc("5 * * * *", c.CheckRssSources)
|
||||||
|
|
||||||
|
//if features.RedditEnabled {
|
||||||
|
// timer.AddFunc("5 1-23 * * *", func() { go c.CheckReddit() })
|
||||||
|
// log.Print("[Input] Reddit backend was enabled")
|
||||||
|
// //go c.CheckReddit()
|
||||||
|
//}
|
||||||
res, _ := features.GetFeature(services.FEATURE_ENABLE_REDDIT_BACKEND)
|
res, _ := features.GetFeature(services.FEATURE_ENABLE_REDDIT_BACKEND)
|
||||||
if res {
|
if res {
|
||||||
timer.AddFunc("5 1-23 * * *", func() { go c.CheckReddit() })
|
timer.AddFunc("5 1-23 * * *", func() { go c.CheckReddit() })
|
||||||
@ -60,18 +67,30 @@ func NewScheduler(ctx context.Context) *Cron {
|
|||||||
//go c.CheckReddit()
|
//go c.CheckReddit()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//if features.YoutubeEnabled {
|
||||||
|
// timer.AddFunc("10 1-23 * * *", func() { go c.CheckYoutube() })
|
||||||
|
// log.Print("[Input] YouTube backend was enabled")
|
||||||
|
//}
|
||||||
res, _ = features.GetFeature(services.FEATURE_ENABLE_YOUTUBE_BACKEND)
|
res, _ = features.GetFeature(services.FEATURE_ENABLE_YOUTUBE_BACKEND)
|
||||||
if res {
|
if res {
|
||||||
timer.AddFunc("10 1-23 * * *", func() { go c.CheckYoutube() })
|
timer.AddFunc("10 1-23 * * *", func() { go c.CheckYoutube() })
|
||||||
log.Print("[Input] YouTube backend was enabled")
|
log.Print("[Input] YouTube backend was enabled")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//if features.FfxivEnabled {
|
||||||
|
// timer.AddFunc("5 5,10,15,20 * * *", func() { go c.CheckFfxiv() })
|
||||||
|
// log.Print("[Input] FFXIV backend was enabled")
|
||||||
|
//}
|
||||||
res, _ = features.GetFeature(services.FEATURE_ENABLE_FFXIV_BACKEND)
|
res, _ = features.GetFeature(services.FEATURE_ENABLE_FFXIV_BACKEND)
|
||||||
if res {
|
if res {
|
||||||
timer.AddFunc("5 5,10,15,20 * * *", func() { go c.CheckFfxiv() })
|
timer.AddFunc("5 5,10,15,20 * * *", func() { go c.CheckFfxiv() })
|
||||||
log.Print("[Input] FFXIV backend was enabled")
|
log.Print("[Input] FFXIV backend was enabled")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//if features.TwitchEnabled {
|
||||||
|
// timer.AddFunc("15 1-23 * * *", func() { go c.CheckTwitch() })
|
||||||
|
// log.Print("[Input] Twitch backend was enabled")
|
||||||
|
//}
|
||||||
res, _ = features.GetFeature(services.FEATURE_ENABLE_TWITCH_BACKEND)
|
res, _ = features.GetFeature(services.FEATURE_ENABLE_TWITCH_BACKEND)
|
||||||
if res {
|
if res {
|
||||||
timer.AddFunc("15 1-23 * * *", func() { go c.CheckTwitch() })
|
timer.AddFunc("15 1-23 * * *", func() { go c.CheckTwitch() })
|
||||||
|
@ -32,3 +32,21 @@ func TestCheckTwitch(t *testing.T) {
|
|||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func setupInMemoryDb() (*sql.DB, error) {
|
||||||
|
db, err := sql.Open("sqlite", ":memory:")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = goose.SetDialect("sqlite3")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = goose.Up(db, "../../database/migrations")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return db, nil
|
||||||
|
}
|
||||||
|
@ -1,14 +1,16 @@
|
|||||||
package input
|
package input
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"strings"
|
||||||
"log"
|
|
||||||
|
|
||||||
"git.jamestombleson.com/jtom38/newsbot-api/internal/domain"
|
"git.jamestombleson.com/jtom38/newsbot-api/internal/domain"
|
||||||
"git.jamestombleson.com/jtom38/newsbot-api/internal/services/cache"
|
|
||||||
"github.com/mmcdole/gofeed"
|
"github.com/mmcdole/gofeed"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type FeedInput interface {
|
||||||
|
GetArticles() (domain.ArticleEntity, error)
|
||||||
|
}
|
||||||
|
|
||||||
type rssClient struct {
|
type rssClient struct {
|
||||||
SourceRecord domain.SourceEntity
|
SourceRecord domain.SourceEntity
|
||||||
}
|
}
|
||||||
@ -21,39 +23,36 @@ func NewRssClient(sourceRecord domain.SourceEntity) rssClient {
|
|||||||
return client
|
return client
|
||||||
}
|
}
|
||||||
|
|
||||||
//func (rc rssClient) ReplaceSourceRecord(source model.Sources) {
|
func (rc rssClient) GetArticles() ([]domain.ArticleEntity, error) {
|
||||||
//rc.SourceRecord = source
|
parser := gofeed.NewParser()
|
||||||
//}
|
feed, err := parser.ParseURL(rc.SourceRecord.Url)
|
||||||
|
|
||||||
func (rc rssClient) getCacheGroup() string {
|
|
||||||
return fmt.Sprintf("rss-%v", rc.SourceRecord.DisplayName)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rc rssClient) GetContent() error {
|
|
||||||
feed, err := rc.PullFeed()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
cacheClient := cache.NewCacheClient(rc.getCacheGroup())
|
|
||||||
|
|
||||||
for _, item := range feed.Items {
|
|
||||||
log.Println(item)
|
|
||||||
|
|
||||||
cacheClient.FindByValue(item.Link)
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rc rssClient) PullFeed() (*gofeed.Feed, error) {
|
|
||||||
feedUri := fmt.Sprintf("%v", rc.SourceRecord.Url)
|
|
||||||
fp := gofeed.NewParser()
|
|
||||||
feed, err := fp.ParseURL(feedUri)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return feed, nil
|
sourceTags := strings.Split(rc.SourceRecord.Tags, ",")
|
||||||
|
var articles []domain.ArticleEntity
|
||||||
|
for _, post := range feed.Items {
|
||||||
|
article := domain.ArticleEntity{
|
||||||
|
SourceID: rc.SourceRecord.ID,
|
||||||
|
Title: post.Title,
|
||||||
|
Description: post.Content,
|
||||||
|
Url: post.Link,
|
||||||
|
PubDate: *post.PublishedParsed,
|
||||||
|
AuthorName: post.Author.Email,
|
||||||
|
}
|
||||||
|
|
||||||
|
var postTags []string
|
||||||
|
postTags = append(postTags, sourceTags...)
|
||||||
|
postTags = append(postTags, post.Categories...)
|
||||||
|
article.Tags = strings.Join(postTags, ",")
|
||||||
|
|
||||||
|
if post.Image == nil {
|
||||||
|
article.Thumbnail = ""
|
||||||
|
}
|
||||||
|
|
||||||
|
articles = append(articles, article)
|
||||||
|
}
|
||||||
|
|
||||||
|
return articles, nil
|
||||||
}
|
}
|
||||||
|
@ -19,12 +19,23 @@ func TestRssClientConstructor(t *testing.T) {
|
|||||||
|
|
||||||
func TestRssGetFeed(t *testing.T) {
|
func TestRssGetFeed(t *testing.T) {
|
||||||
client := input.NewRssClient(rssRecord)
|
client := input.NewRssClient(rssRecord)
|
||||||
feed, err := client.PullFeed()
|
_, err := client.GetArticles()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
if len(feed.Items) >= 0 {
|
}
|
||||||
t.Error("failed to collect items from the fees")
|
|
||||||
|
func TestRssAgainstGita(t *testing.T) {
|
||||||
|
client := input.NewRssClient(domain.SourceEntity{
|
||||||
|
ID: 2,
|
||||||
|
DisplayName: "Gitea - Newsbot-api",
|
||||||
|
Source: domain.SourceCollectorRss,
|
||||||
|
Url: "https://git.jamestombleson.com/jtom38/newsbot-api.rss",
|
||||||
|
Tags: "rss,gitea,newsbot-api",
|
||||||
|
})
|
||||||
|
_, err := client.GetArticles()
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user