Skip to content

Commit 0711e78

Browse files
committed
feat: optimized viewer msg sending with parallelism
1 parent 7a65f66 commit 0711e78

File tree

12 files changed

+279
-72
lines changed

12 files changed

+279
-72
lines changed

app/discorder/main.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
"darkbot/app/settings"
1212
"darkbot/app/settings/logus"
1313
"darkbot/app/settings/types"
14+
"darkbot/app/settings/utils"
15+
"fmt"
1416
"time"
1517

1618
"github.com/bwmarrin/discordgo"
@@ -41,8 +43,12 @@ func (d *Discorder) SengMessage(channelID types.DiscordChannelID, content string
4143
}
4244

4345
func (d *Discorder) EditMessage(channelID types.DiscordChannelID, messageID types.DiscordMessageID, content string) error {
44-
_, err := d.dg.ChannelMessageEdit(string(channelID), string(messageID), content)
45-
logus.CheckWarn(err, "failed editing message in discorder", logus.ChannelID(channelID))
46+
var err error
47+
utils.TimeMeasure(func() {
48+
msg, err := d.dg.ChannelMessageEdit(string(channelID), string(messageID), content)
49+
logus.CheckWarn(err, "failed editing message in discorder", logus.ChannelID(channelID))
50+
logus.Debug(fmt.Sprintf("Discorder.EditMessage.msg=%v", msg))
51+
}, fmt.Sprintf("Discorder.EditMessage content=%s", content), logus.ChannelID(channelID), logus.MessageID(messageID))
4652
return err
4753
}
4854

app/forumer/run_test.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"darkbot/app/forumer/forum_types"
66
"darkbot/app/settings/logus"
77
"darkbot/app/settings/types"
8-
"os"
8+
"darkbot/app/settings/utils"
99
"testing"
1010
)
1111

@@ -26,10 +26,6 @@ func newMockedThreadsQuery() MockedThreadsQuery {
2626
return MockedThreadsQuery{threads: one_thread}
2727
}
2828

29-
func FixtureDevEnv() bool {
30-
return os.Getenv("DEV_ENV") == "true"
31-
}
32-
3329
func TestForumerSending(t *testing.T) {
3430

3531
mocked_post_requester := FixtureDetailedRequester()
@@ -40,7 +36,7 @@ func TestForumerSending(t *testing.T) {
4036

4137
cg_channel := configurator.NewConfiguratorChannel(configurator.NewConfigurator(dbpath))
4238

43-
if FixtureDevEnv() {
39+
if utils.FixtureDevEnv() {
4440
cg_channel.Add(dev_env_channel)
4541
}
4642

@@ -64,7 +60,7 @@ func TestSubForumSending(t *testing.T) {
6460

6561
cg_channel := configurator.NewConfiguratorChannel(configurator.NewConfigurator(dbpath))
6662

67-
if FixtureDevEnv() {
63+
if utils.FixtureDevEnv() {
6864
cg_channel.Add(dev_env_channel)
6965
}
7066

app/settings/types/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ type APIurl string
1111
type ScrappyLoopDelay int
1212
type ViewerLoopDelay int
1313

14+
type ViewerDelayBetweenChannels int
15+
1416
type DiscordChannelID string
1517

1618
type DiscordMessageID string

app/settings/utils/fixtures.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package utils
2+
3+
import "os"
4+
5+
func FixtureDevEnv() bool {
6+
return os.Getenv("DEV_ENV") == "true"
7+
}

app/settings/utils/time.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package utils
2+
3+
import (
4+
"darkbot/app/settings/logus"
5+
"fmt"
6+
"time"
7+
)
8+
9+
type timeMeasurer struct {
10+
msg string
11+
ops []logus.SlogParam
12+
time_started time.Time
13+
}
14+
15+
func NewTimeMeasure(msg string, ops ...logus.SlogParam) *timeMeasurer {
16+
return &timeMeasurer{
17+
msg: msg,
18+
ops: ops,
19+
time_started: time.Now(),
20+
}
21+
}
22+
23+
func (t *timeMeasurer) Close() {
24+
logus.Debug(fmt.Sprintf("time_measure %v | %s", time.Since(t.time_started), t.msg), t.ops...)
25+
}
26+
27+
func TimeMeasure(callback func(), msg string, ops ...logus.SlogParam) {
28+
time_started := NewTimeMeasure(msg, ops...)
29+
defer time_started.Close()
30+
callback()
31+
}

app/settings/worker/worker_temp.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ func NewTask(id worker_types.TaskID) *Task {
3030
const (
3131
CodeSuccess worker_types.TaskStatusCode = 0
3232
CodeTimeout worker_types.TaskStatusCode = 1
33+
CodeFailure worker_types.TaskStatusCode = 2
3334
)
3435

3536
type TaskPool[taskT ITask] struct {
@@ -74,11 +75,11 @@ func NewTaskPool[T ITask](opts ...TaskPoolOption[T]) *TaskPool[T] {
7475
}
7576

7677
func (j *TaskPool[taskT]) launchWorker(worker_id worker_types.WorkerID, tasks <-chan taskT, results chan<- worker_types.TaskStatusCode) {
77-
logus.Debug("worker started", worker_logus.WorkerID(worker_id))
78+
logus.Info("worker started", worker_logus.WorkerID(worker_id))
7879
for task := range tasks {
7980
results <- task.RunTask(worker_id)
8081
}
81-
logus.Debug("worker finished", worker_logus.WorkerID(worker_id))
82+
logus.Info("worker finished", worker_logus.WorkerID(worker_id))
8283
}
8384

8485
/// Temporal

app/viewer/channel.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ import (
44
"darkbot/app/discorder"
55
"darkbot/app/settings/logus"
66
"darkbot/app/settings/types"
7+
"darkbot/app/settings/utils"
78
"darkbot/app/viewer/apis"
89
"darkbot/app/viewer/views"
910
"darkbot/app/viewer/views/baseview"
1011
"darkbot/app/viewer/views/eventview"
1112
"darkbot/app/viewer/views/playerview"
13+
"fmt"
1214
"strings"
1315
"time"
1416
)
@@ -61,8 +63,10 @@ func (v *ChannelView) Render() {
6163
// Edit if message ID is present.
6264
// Send if not present.
6365
func (v ChannelView) Send() {
64-
for _, view := range v.views {
65-
view.Send()
66+
for view_num, view := range v.views {
67+
utils.TimeMeasure(func() {
68+
view.Send()
69+
}, fmt.Sprintf("view.Send view_num=%d, view=%v", view_num, view), logus.ChannelID(v.ChannelID))
6670
}
6771
}
6872

app/viewer/task.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package viewer
2+
3+
import (
4+
"darkbot/app/settings/logus"
5+
"darkbot/app/settings/types"
6+
"darkbot/app/settings/utils"
7+
"darkbot/app/settings/worker"
8+
"darkbot/app/settings/worker/worker_types"
9+
"darkbot/app/viewer/apis"
10+
"fmt"
11+
"sync"
12+
"time"
13+
)
14+
15+
type TaskRefreshChannel struct {
16+
*worker.Task
17+
18+
// any desired arbitary data
19+
api *apis.API
20+
channelID types.DiscordChannelID
21+
delayBetweenChannels types.ViewerDelayBetweenChannels
22+
}
23+
24+
func NewRefreshChannelTask(
25+
api *apis.API,
26+
channelID types.DiscordChannelID,
27+
delayBetweenChannels types.ViewerDelayBetweenChannels,
28+
) *TaskRefreshChannel {
29+
task_id_gen += 1
30+
return &TaskRefreshChannel{
31+
Task: worker.NewTask(worker_types.TaskID(task_id_gen)),
32+
api: api,
33+
channelID: channelID,
34+
delayBetweenChannels: delayBetweenChannels,
35+
}
36+
}
37+
38+
var task_id_gen int = 0
39+
40+
var guildAntiRateLimitMutexes map[string]*sync.Mutex
41+
42+
func init() {
43+
guildAntiRateLimitMutexes = make(map[string]*sync.Mutex)
44+
}
45+
46+
func GetMutex(MutexKey string) *sync.Mutex {
47+
value, ok := guildAntiRateLimitMutexes[MutexKey]
48+
49+
if ok {
50+
return value
51+
}
52+
53+
new_mutex := &sync.Mutex{}
54+
guildAntiRateLimitMutexes[MutexKey] = new_mutex
55+
return new_mutex
56+
}
57+
58+
func (v *TaskRefreshChannel) RunTask(worker_id worker_types.WorkerID) worker_types.TaskStatusCode {
59+
channel_info, err := v.api.Discorder.GetDiscordSession().Channel(string(v.channelID))
60+
if logus.CheckError(err, "unable to get channel info") {
61+
return worker.CodeFailure
62+
}
63+
64+
MutexKey := channel_info.GuildID
65+
GuildMutex := GetMutex(MutexKey)
66+
GuildMutex.Lock()
67+
defer GuildMutex.Unlock()
68+
69+
time_run_task_started := time.Now()
70+
time_new_channel := utils.NewTimeMeasure("new_channel", logus.ChannelID(v.channelID))
71+
channel := NewChannelView(v.api, v.channelID)
72+
73+
time_new_channel.Close()
74+
75+
time_render := utils.NewTimeMeasure("channel.Render", logus.ChannelID(v.channelID))
76+
channel.Render()
77+
time_render.Close()
78+
79+
time_discover := utils.NewTimeMeasure("channel.Discover", logus.ChannelID(v.channelID))
80+
err = channel.Discover()
81+
time_discover.Close()
82+
83+
if logus.CheckWarn(err, "unable to grab Discord msgs", logus.ChannelID(v.channelID)) {
84+
return worker.CodeFailure
85+
}
86+
87+
time_send := utils.NewTimeMeasure("channel.Send", logus.ChannelID(v.channelID))
88+
channel.Send()
89+
time_send.Close()
90+
91+
time_delete_old := utils.NewTimeMeasure("channel.DeleteOld", logus.ChannelID(v.channelID))
92+
channel.DeleteOld()
93+
time_delete_old.Close()
94+
v.SetAsDone()
95+
logus.Info(fmt.Sprintf("RunTask finished, TaskID=%d, elapsed=%s, started_at=%s, finished_at=%s",
96+
v.Task.GetID(),
97+
time.Since(time_run_task_started).String(),
98+
time_run_task_started.String(),
99+
time.Now().String(),
100+
))
101+
102+
// Important for Mutex above! Prevents Guild level rate limits. looks like 5 msg edits per 5 second at one server is good
103+
time.Sleep(time.Duration(v.delayBetweenChannels) * time.Second)
104+
return worker.CodeSuccess
105+
}

app/viewer/viewer.go

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,57 +5,72 @@ import (
55
"darkbot/app/settings"
66
"darkbot/app/settings/logus"
77
"darkbot/app/settings/types"
8+
"darkbot/app/settings/utils"
9+
"darkbot/app/settings/worker"
810
"darkbot/app/viewer/apis"
911
"time"
1012
)
1113

1214
type ViewerDelays struct {
13-
betweenChannels int
15+
betweenChannels types.ViewerDelayBetweenChannels
1416
betweenLoops types.ViewerLoopDelay
1517
}
1618

1719
type Viewer struct {
18-
delays ViewerDelays
19-
api *apis.API
20+
delays ViewerDelays
21+
api *apis.API
22+
workers *worker.TaskPoolPeristent[*TaskRefreshChannel]
2023
}
2124

2225
func NewViewer(dbpath types.Dbpath, scrappy_storage *scrappy.ScrappyStorage) *Viewer {
2326
api := apis.NewAPI(dbpath, scrappy_storage)
24-
return &Viewer{
27+
v := &Viewer{
2528
api: api,
2629
delays: ViewerDelays{
27-
betweenChannels: 1,
30+
betweenChannels: 10,
2831
betweenLoops: settings.ViewerLoopDelay,
2932
},
3033
}
34+
35+
v.workers = worker.NewTaskPoolPersistent[*TaskRefreshChannel](
36+
worker.WithAllowFailedTasks[*TaskRefreshChannel](),
37+
worker.WithDisableParallelism[*TaskRefreshChannel](false),
38+
worker.WithWorkersAmount[*TaskRefreshChannel](10),
39+
)
40+
41+
return v
3142
}
3243

3344
func (v *Viewer) Run() {
3445
logus.Info("Viewer is now running.")
3546

47+
go func() {
48+
for {
49+
v.workers.AwaitSomeTask()
50+
}
51+
}()
3652
for {
3753
v.Update()
3854
}
3955
}
4056

41-
func (v Viewer) Update() {
57+
func (v *Viewer) Update() {
58+
time_viewer_started := time.Now()
4259
logus.Info("Viewer.Update")
4360

44-
// Query all channels
4561
channelIDs, _ := v.api.Channels.List()
4662
logus.Info("Viewer.Update.channelIDs=", logus.ChannelIDs(channelIDs))
4763

4864
// For each channel
65+
allChannelsTime := utils.NewTimeMeasure("all channels")
4966
for _, channelID := range channelIDs {
50-
channel := NewChannelView(v.api, channelID)
51-
channel.Render()
52-
err := channel.Discover()
53-
if logus.CheckWarn(err, "unable to grab Discord msgs", logus.ChannelID(channelID)) {
54-
continue
55-
}
56-
channel.Send()
57-
channel.DeleteOld()
58-
time.Sleep(time.Duration(v.delays.betweenChannels) * time.Second)
67+
utils.TimeMeasure(func() {
68+
task := NewRefreshChannelTask(v.api, channelID, v.delays.betweenChannels)
69+
// task.RunTask(worker_types.WorkerID(0))
70+
v.workers.DelayTask(task)
71+
}, "one channel", logus.ChannelID(channelID))
5972
}
73+
allChannelsTime.Close()
74+
logus.Info("Viewer.Update Finished " + time.Since(time_viewer_started).String())
6075
time.Sleep(time.Duration(v.delays.betweenLoops) * time.Second)
6176
}

app/viewer/viewer_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package viewer
2+
3+
import (
4+
"darkbot/app/settings/utils"
5+
"testing"
6+
)
7+
8+
func TestDebugPerformance(t *testing.T) {
9+
if !utils.FixtureDevEnv() {
10+
return
11+
}
12+
13+
}

0 commit comments

Comments
 (0)