Newsbot.Collector/Newsbot.Collector.Services/Jobs/RssWatcherJob.cs
James Tombleson 9be985da0a
Features/adding youtube (#13)
* Found the meta tags on youtube... in the body and updated the client to pull them out.

* Updated namespace on test

* I think formatting cleaned this up

* Seed migrations have been cleaned up to get my configs out and moving them to a script.

* Updates to the ISourcesRepository.cs to allow for new calls to the db.

* formatter

* Db models updated. Icon now can track sourceID and source can have a youtube id.

* Updated api logger to ignore otel if no connection string given.

* updated docker init so I can run migrations from the image

* seed was updated to reflect the new api changes

* Updated the SourcesController.cs to grab icon data.

* Added reddit const values

* Minor changes to HtmlPageReader.cs

* Jobs are now pulling in the config section to bundle values.

* Removed youtube api, not needed anymore.

* test updates
2023-03-31 22:49:39 -07:00

158 lines
4.9 KiB
C#

using System.ServiceModel.Syndication;
using System.Xml;
using Newsbot.Collector.Database.Repositories;
using Newsbot.Collector.Domain.Consts;
using Newsbot.Collector.Domain.Interfaces;
using Newsbot.Collector.Domain.Models;
using Newsbot.Collector.Domain.Models.Config;
using Newsbot.Collector.Services.HtmlParser;
using Serilog;
namespace Newsbot.Collector.Services.Jobs;
public class RssWatcherJobOptions
{
//public string? ConnectionString { get; init; }
//public string? OpenTelemetry { get; init; }
public ConfigSectionConnectionStrings? ConnectionStrings { get; set; }
public ConfigSectionRssModel? Config { get; set; }
}
// This class was made to work with Hangfire and it does not support constructors.
public class RssWatcherJob
{
private const string JobName = "RssWatcherJob";
private IArticlesRepository _articles;
private ILogger _logger;
private IDiscordQueueRepository _queue;
private ISourcesRepository _source;
public RssWatcherJob()
{
_articles = new ArticlesTable("");
_queue = new DiscordQueueTable("");
_source = new SourcesTable("");
_logger = JobLogger.GetLogger("", JobName);
}
public void InitAndExecute(RssWatcherJobOptions options)
{
options.ConnectionStrings ??= new ConfigSectionConnectionStrings();
options.Config ??= new ConfigSectionRssModel();
_articles = new ArticlesTable(options.ConnectionStrings.Database ?? "");
_queue = new DiscordQueueTable(options.ConnectionStrings.Database ?? "");
_source = new SourcesTable(options.ConnectionStrings.Database ?? "");
_logger = JobLogger.GetLogger(options.ConnectionStrings.OpenTelemetry ?? "", JobName);
_logger.Information($"{JobName} - Job was triggered");
if (!options.Config.IsEnabled)
{
_logger.Information($"{JobName} - Going to exit because feature flag is off.");
return;
}
_logger.Information($"{JobName} - Setting up the job");
Execute();
}
public void Execute()
{
var articles = new List<ArticlesModel>();
_logger.Information($"{JobName} - Requesting sources");
var sources = _source.ListByType(SourceTypes.Rss);
_logger.Information($"{JobName} - Got {sources.Count} back");
foreach (var source in sources)
{
_logger.Information($"{JobName} - Starting to process '{source.Name}'");
_logger.Information($"{JobName} - Starting to request feed to be processed");
var results = Collect(source.Url, source.ID);
_logger.Information($"{JobName} - Collected {results.Count} posts");
articles.AddRange(results);
}
_logger.Information($"{JobName} - Sending posts over to the database");
UpdateDatabase(articles);
_logger.Information($"{JobName} - Done!");
}
public List<ArticlesModel> Collect(string url, Guid sourceId, int sleep = 3000)
{
var collectedPosts = new List<ArticlesModel>();
using var reader = XmlReader.Create(url);
var feed = SyndicationFeed.Load(reader);
foreach (var post in feed.Items.ToList())
{
var articleUrl = post.Links[0].Uri.AbsoluteUri;
// Check if we have seen the url before
// If we have, skip and save the site bandwidth
if (IsThisUrlKnown(articleUrl)) continue;
var meta = new HtmlPageReader(new HtmlPageReaderOptions
{
Url = articleUrl
});
meta.Parse();
var article = new ArticlesModel
{
Title = post.Title.Text,
Tags = FetchTags(post),
URL = articleUrl,
PubDate = post.PublishDate.DateTime,
Thumbnail = meta.Data.Header.Image,
Description = meta.Data.Header.Description,
SourceID = sourceId
};
collectedPosts.Add(article);
// try to not be too greedy
Thread.Sleep(sleep);
}
return collectedPosts;
}
public void UpdateDatabase(List<ArticlesModel> items)
{
foreach (var item in items)
{
if (item.URL is null)
{
Log.Warning("RSS Watcher collected a blank url and was skipped.");
continue;
}
var p = _articles.New(item);
_queue.New(new DiscordQueueModel
{
ArticleID = p.ID
});
}
}
private bool IsThisUrlKnown(string url)
{
var isKnown = _articles.GetByUrl(url);
if (isKnown.URL == url) return true;
return false;
}
private string FetchTags(SyndicationItem post)
{
var result = "";
foreach (var tag in post.Categories) result += $"{tag.Name},";
return result;
}
}