Skip to content
This repository was archived by the owner on Nov 8, 2022. It is now read-only.

Commit 1f894f1

Browse files
authored
feat: rss parse and basic workflow (#431)
* refactor(rss): basic concept & cache re-org * refactor(rss): blog create logic wip * refactor(rss): blog create logic wip * refactor(rss): blog create logic wip * chore(blog_rss): naming & error handling * chore: wip
1 parent 14e3bcb commit 1f894f1

File tree

26 files changed

+737
-41
lines changed

26 files changed

+737
-41
lines changed

config/config.exs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,24 @@ config :groupher_server, GroupherServer.Mailer,
134134
adapter: Bamboo.MailgunAdapter,
135135
domain: "mailer.coderplanets.com"
136136

137-
# handle background jobs
138-
config :rihanna,
139-
jobs_table_name: "background_jobs",
140-
producer_postgres_connection: {Ecto, GroupherServer.Repo}
137+
config :groupher_server, :cache,
138+
pool: %{
139+
common: %{
140+
name: :common,
141+
size: 5000,
142+
minutes: 10
143+
},
144+
user_login: %{
145+
name: :user_login,
146+
size: 10_000,
147+
minutes: 10_080
148+
},
149+
blog_rss: %{
150+
name: :blog_rss,
151+
size: 1000,
152+
minutes: 15
153+
}
154+
}
141155

142156
# cron-like job scheduler
143157
config :groupher_server, Helper.Scheduler,
@@ -147,6 +161,11 @@ config :groupher_server, Helper.Scheduler,
147161
{"@daily", {Helper.Scheduler, :archive_artiments, []}}
148162
]
149163

164+
# handle background jobs
165+
config :rihanna,
166+
jobs_table_name: "background_jobs",
167+
producer_postgres_connection: {Ecto, GroupherServer.Repo}
168+
150169
import_config "#{Mix.env()}.exs"
151170

152171
if File.exists?("config/#{Mix.env()}.secret.exs") do

lib/groupher_server/application.ex

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,31 @@
11
defmodule GroupherServer.Application do
22
@moduledoc false
33
use Application
4+
import Helper.Utils, only: [get_config: 2]
5+
6+
alias Helper.Cache
7+
8+
@cache_pool get_config(:cache, :pool)
49

510
# See https://hexdocs.pm/elixir/Application.html
611
# for more information on OTP Applications
712
@spec start(any, any) :: {:error, any} | {:ok, pid}
813
def start(_type, _args) do
914
import Supervisor.Spec
10-
alias Helper.Cache
1115

1216
# Define workers and child supervisors to be supervised
13-
children = [
14-
# Start the PubSub system
15-
{Phoenix.PubSub, name: MyApp.PubSub},
16-
# Start the Ecto repository
17-
supervisor(GroupherServer.Repo, []),
18-
# Start the endpoint when the application starts
19-
supervisor(GroupherServerWeb.Endpoint, []),
20-
# Start your own worker by calling: GroupherServer.Worker.start_link(arg1, arg2, arg3)
21-
# worker(GroupherServer.Worker, [arg1, arg2, arg3]),
22-
worker(Cachex, [:common, Cache.config(:common)], id: :common),
23-
worker(Cachex, [:user_login, Cache.config(:user_login)], id: :user_login),
24-
#
25-
worker(Helper.Scheduler, []),
26-
{Rihanna.Supervisor, [postgrex: GroupherServer.Repo.config()]}
27-
]
17+
children =
18+
[
19+
# Start the PubSub system
20+
{Phoenix.PubSub, name: MyApp.PubSub},
21+
# Start the Ecto repository
22+
supervisor(GroupherServer.Repo, []),
23+
# Start the endpoint when the application starts
24+
supervisor(GroupherServerWeb.Endpoint, []),
25+
# Start your own worker by calling: GroupherServer.Worker.start_link(arg1, arg2, arg3)
26+
worker(Helper.Scheduler, []),
27+
{Rihanna.Supervisor, [postgrex: GroupherServer.Repo.config()]}
28+
] ++ cache_workers()
2829

2930
# See https://hexdocs.pm/elixir/Supervisor.html
3031
# for other strategies and supported options
@@ -38,4 +39,19 @@ defmodule GroupherServer.Application do
3839
GroupherServerWeb.Endpoint.config_change(changed, removed)
3940
:ok
4041
end
42+
43+
defp cache_workers() do
44+
import Supervisor.Spec
45+
46+
# worker(GroupherServer.Worker, [arg1, arg2, arg3]),
47+
# worker(Cachex, [:common, Cache.config(:common)], id: :common),
48+
# worker(Cachex, [:user_login, Cache.config(:user_login)], id: :user_login),
49+
# worker(Cachex, [:blog_rss, Cache.config(:blog_rss)], id: :blog_rss),
50+
@cache_pool
51+
|> Map.keys()
52+
|> Enum.reduce([], fn key, acc ->
53+
name = @cache_pool[key].name
54+
acc ++ [worker(Cachex, [name, Cache.config(key)], id: name)]
55+
end)
56+
end
4157
end

lib/groupher_server/cms/cms.ex

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ defmodule GroupherServer.CMS do
1010
alias Delegate.{
1111
AbuseReport,
1212
ArticleCURD,
13+
BlogCURD,
1314
ArticleCommunity,
1415
ArticleEmotion,
1516
CitedArtiment,
@@ -102,6 +103,11 @@ defmodule GroupherServer.CMS do
102103

103104
defdelegate archive_articles(thread), to: ArticleCURD
104105

106+
defdelegate create_blog(community, attrs, user), to: BlogCURD
107+
defdelegate create_blog_rss(attrs), to: BlogCURD
108+
defdelegate update_blog_rss(attrs), to: BlogCURD
109+
defdelegate blog_rss_info(rss), to: BlogCURD
110+
105111
defdelegate paged_citing_contents(type, id, filter), to: CitedArtiment
106112

107113
defdelegate upvote_article(thread, article_id, user), to: ArticleUpvote
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
defmodule GroupherServer.CMS.Delegate.BlogCURD do
2+
@moduledoc """
3+
CURD operation on post/job ...
4+
"""
5+
import Ecto.Query, warn: false
6+
import Helper.Utils, only: [strip_struct: 1, done: 1]
7+
import Helper.ErrorCode
8+
9+
import GroupherServer.CMS.Delegate.ArticleCURD, only: [create_article: 4]
10+
# import Helper.Utils, only: [done: 1]
11+
12+
# import Helper.ErrorCode
13+
# import ShortMaps
14+
15+
# alias Helper.{ORM}
16+
alias GroupherServer.{Accounts, CMS, Repo}
17+
alias CMS.Model.{BlogRSS, Community}
18+
alias Accounts.Model.User
19+
20+
alias Helper.{ORM, Cache, RSS}
21+
22+
@cache_pool :blog_rss
23+
24+
# alias Ecto.Multi
25+
def blog_rss_info(rss) when is_binary(rss) do
26+
with {:ok, feed} <- ORM.find_by(BlogRSS, %{rss: rss}) do
27+
{:ok, feed}
28+
else
29+
_ -> fetch_fresh_rssinfo_and_cache(rss)
30+
end
31+
end
32+
33+
# attrs 包含 rss, blog_title
34+
# def create_article(%Community{id: cid}, thread, attrs, %User{id: uid}) do
35+
def create_blog(%Community{} = community, attrs, %User{} = user) do
36+
# 1. 先判断 rss 是否存在
37+
## 1.1 如果存在,从 cache 中获取
38+
## 1.2 如不存在,则创建一条 RSS
39+
with {:ok, feed} <- blog_rss_info(attrs.rss) do
40+
do_create_blog(community, attrs, user, feed)
41+
42+
# IO.inspect(feed, label: "create blog")
43+
# 通过 feed 有没有 id 来 insert / update
44+
# 通过 blog_title, 组合 attrs 传给 create_article
45+
end
46+
47+
# 2. 创建 blog
48+
## 2.1 blog +字段 rss, author
49+
## 2.2 title, digest, xxx
50+
51+
# 前台获取作者信息的时候从 rss 表读取
52+
end
53+
54+
# rss 记录存在, 直接创建 blog
55+
defp do_create_blog(%Community{} = community, attrs, %User{} = user, %{id: _} = feed) do
56+
blog_author = if is_nil(feed.author), do: nil, else: Map.from_struct(feed.author)
57+
selected_feed = Enum.find(feed.history_feed, &(&1.title == attrs.title))
58+
59+
# TODO: feed_digest, feed_content
60+
attrs =
61+
attrs
62+
|> Map.merge(%{
63+
link_addr: selected_feed.link_addr,
64+
published: selected_feed.published,
65+
blog_author: blog_author
66+
})
67+
|> Enum.reject(fn {_, v} -> is_nil(v) end)
68+
|> Map.new()
69+
70+
create_article(community, :blog, attrs, user)
71+
end
72+
73+
# rss 记录不存在, 先创建 rss, 再创建 blog
74+
defp do_create_blog(%Community{} = community, attrs, %User{} = user, feed) do
75+
with {:ok, feed} <- CMS.blog_rss_info(attrs.rss),
76+
{:ok, feed} <- create_blog_rss(feed) do
77+
do_create_blog(community, attrs, user, feed)
78+
end
79+
end
80+
81+
def create_blog_rss(attrs) do
82+
history_feed = Map.get(attrs, :history_feed)
83+
attrs = attrs |> Map.drop([:history_feed])
84+
85+
%BlogRSS{}
86+
|> Ecto.Changeset.change(attrs)
87+
|> Ecto.Changeset.put_embed(:history_feed, history_feed)
88+
|> Repo.insert()
89+
end
90+
91+
def update_blog_rss(%{rss: rss} = attrs) do
92+
with {:ok, blog_rss} <- ORM.find_by(BlogRSS, rss: rss) do
93+
history_feed =
94+
Map.get(attrs, :history_feed, Enum.map(blog_rss.history_feed, &strip_struct(&1)))
95+
96+
attrs = attrs |> Map.drop([:history_feed])
97+
98+
%BlogRSS{}
99+
|> Ecto.Changeset.change(attrs)
100+
|> Ecto.Changeset.put_embed(:history_feed, history_feed)
101+
|> Repo.insert()
102+
end
103+
end
104+
105+
# create done
106+
# defp result({:ok, %{set_active_at_timestamp: result}}) do
107+
# {:ok, result}
108+
# end
109+
110+
# defp result({:ok, %{update_article_meta: result}}), do: {:ok, result}
111+
112+
# defp result({:error, :create_article, _result, _steps}) do
113+
# {:error, [message: "create article", code: ecode(:create_fails)]}
114+
# end
115+
116+
# defp result({:error, _, result, _steps}), do: {:error, result}
117+
118+
@doc """
119+
get and cache feed by rss address as key
120+
"""
121+
def fetch_fresh_rssinfo_and_cache(rss) do
122+
case Cache.get(@cache_pool, rss) do
123+
{:ok, rssinfo} -> {:ok, rssinfo}
124+
{:error, _} -> get_rssinfo_and_cache(rss)
125+
end
126+
end
127+
128+
defp get_rssinfo_and_cache(rss) do
129+
# {:ok, feed} = RSS.get(rss)
130+
with {:ok, rssinfo} <- RSS.get(rss) do
131+
Cache.put(@cache_pool, rss, rssinfo)
132+
{:ok, rssinfo}
133+
else
134+
{:error, _} -> {:error, [message: "blog rss is invalid", code: ecode(:invalid_blog_rss)]}
135+
end
136+
end
137+
end

lib/groupher_server/cms/models/blog.ex

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,18 @@ defmodule GroupherServer.CMS.Model.Blog do
1515

1616
@required_fields ~w(title digest)a
1717
@article_cast_fields general_article_cast_fields()
18-
@optional_fields ~w(digest)a ++ @article_cast_fields
18+
@optional_fields ~w(digest feed_digest feed_content published)a ++ @article_cast_fields
1919

2020
@type t :: %Blog{}
2121
schema "cms_blogs" do
2222
# for frontend constant
2323
field(:copy_right, :string, default: "", virtual: true)
2424

25+
field(:feed_digest, :string)
26+
field(:feed_content, :string)
27+
field(:published, :string)
28+
embeds_one(:blog_author, Embeds.BlogAuthor, on_replace: :update)
29+
2530
article_tags_field(:blog)
2631
article_communities_field(:blog)
2732
general_article_fields(:blog)
@@ -33,13 +38,15 @@ defmodule GroupherServer.CMS.Model.Blog do
3338
|> cast(attrs, @optional_fields ++ @required_fields)
3439
|> validate_required(@required_fields)
3540
|> cast_embed(:meta, required: false, with: &Embeds.ArticleMeta.changeset/2)
41+
|> cast_embed(:blog_author, required: false, with: &Embeds.BlogAuthor.changeset/2)
3642
|> generl_changeset
3743
end
3844

3945
@doc false
4046
def update_changeset(%Blog{} = blog, attrs) do
4147
blog
4248
|> cast(attrs, @optional_fields ++ @required_fields)
49+
|> cast_embed(:blog_author, required: false, with: &Embeds.BlogAuthor.changeset/2)
4350
|> generl_changeset
4451
end
4552

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
defmodule GroupherServer.CMS.Model.BlogRSS do
2+
@moduledoc false
3+
alias __MODULE__
4+
5+
use Ecto.Schema
6+
use Accessible
7+
8+
import Ecto.Changeset
9+
# import GroupherServer.CMS.Helper.Macros
10+
11+
alias GroupherServer.CMS
12+
alias CMS.Model.Embeds
13+
14+
@timestamps_opts [type: :utc_datetime_usec]
15+
16+
@required_fields ~w(link rss)a
17+
@optional_fields ~w(subtitle author updated)a
18+
19+
@type t :: %BlogRSS{}
20+
schema "cms_blog_rss" do
21+
field(:rss, :string)
22+
field(:title, :string)
23+
field(:subtitle, :string)
24+
field(:link, :string)
25+
field(:updated, :string)
26+
embeds_many(:history_feed, Embeds.BlogHistoryFeed, on_replace: :delete)
27+
embeds_one(:author, Embeds.BlogAuthor, on_replace: :update)
28+
end
29+
30+
@doc false
31+
def changeset(%BlogRSS{} = blog_rss, attrs) do
32+
blog_rss
33+
|> cast(attrs, @optional_fields ++ @required_fields)
34+
|> validate_required(@required_fields)
35+
|> cast_embed(:history_feed, required: true, with: &Embeds.BlogHistoryFeed.changeset/2)
36+
|> cast_embed(:author, required: false, with: &Embeds.BlogAuthor.changeset/2)
37+
end
38+
39+
@doc false
40+
def update_changeset(%BlogRSS{} = blog_rss, attrs) do
41+
blog_rss
42+
|> cast(attrs, @optional_fields ++ @required_fields)
43+
|> cast_embed(:history_feed, required: false, with: &Embeds.BlogHistoryFeed.changeset/2)
44+
|> cast_embed(:author, required: false, with: &Embeds.BlogAuthor.changeset/2)
45+
end
46+
47+
# @doc false
48+
# def update_changeset(%BlogRSS{} = blog_rss, attrs) do
49+
# blog_rss
50+
# |> cast(attrs, @optional_fields ++ @required_fields)
51+
# |> generl_changeset
52+
# end
53+
54+
# defp generl_changeset(changeset) do
55+
# changeset
56+
# |> validate_length(:title, min: 3, max: 100)
57+
# |> cast_embed(:emotions, with: &Embeds.ArticleEmotion.changeset/2)
58+
# |> validate_length(:link_addr, min: 5, max: 400)
59+
# end
60+
end
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
defmodule GroupherServer.CMS.Model.Embeds.BlogAuthor do
2+
@moduledoc """
3+
general community meta
4+
"""
5+
use Ecto.Schema
6+
use Accessible
7+
8+
import Ecto.Changeset
9+
10+
@required_fields ~w(name)a
11+
@optional_fields ~w(link intro github twitter)a
12+
13+
embedded_schema do
14+
field(:name, :string)
15+
field(:link, :string)
16+
field(:intro, :string)
17+
field(:github, :string)
18+
field(:twitter, :string)
19+
end
20+
21+
def changeset(struct, attrs) do
22+
struct
23+
|> cast(attrs, @optional_fields ++ @required_fields)
24+
|> validate_required(@required_fields)
25+
end
26+
end

0 commit comments

Comments
 (0)