-
-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathtaskQueue.go
More file actions
113 lines (94 loc) · 1.94 KB
/
taskQueue.go
File metadata and controls
113 lines (94 loc) · 1.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package scraper
import (
"bdo-rest-api/utils"
"sync"
"time"
)
type Task struct {
AddedAt time.Time
ClientIP string
Hash string
URL string
}
type TaskQueue struct {
clientIPs map[string]int
cond *sync.Cond
hashSet map[string]struct{}
mutex sync.Mutex
paused bool
processFunc func(Task)
tasks chan Task
}
func NewTaskQueue(bufferSize int) *TaskQueue {
queue := &TaskQueue{
clientIPs: make(map[string]int),
hashSet: make(map[string]struct{}),
tasks: make(chan Task, bufferSize),
}
queue.cond = sync.NewCond(&queue.mutex)
go queue.run()
return queue
}
func (q *TaskQueue) AddTask(clientIP, hash, url string) (added bool) {
fullURL := utils.BuildRequest(url, map[string]string{
"taskAddedAt": time.Now().Format(time.RFC3339),
"taskClient": clientIP,
"taskHash": hash,
})
q.mutex.Lock()
if _, exists := q.hashSet[hash]; exists {
q.mutex.Unlock()
return false
}
q.hashSet[hash] = struct{}{}
q.clientIPs[clientIP]++
q.mutex.Unlock()
q.tasks <- Task{
ClientIP: clientIP,
Hash: hash,
URL: fullURL,
}
return true
}
func (q *TaskQueue) run() {
for task := range q.tasks {
q.mutex.Lock()
for q.paused {
q.cond.Wait()
}
process := q.processFunc
q.mutex.Unlock()
if process != nil {
process(task)
}
}
}
func (q *TaskQueue) SetProcessFunc(f func(Task)) {
q.mutex.Lock()
q.processFunc = f
q.mutex.Unlock()
}
func (q *TaskQueue) Pause(t time.Duration) {
q.mutex.Lock()
q.paused = true
q.mutex.Unlock()
time.Sleep(t)
q.mutex.Lock()
q.paused = false
q.mutex.Unlock()
q.cond.Broadcast()
}
func (q *TaskQueue) CountQueuedTasksForClient(clientIP string) (count int) {
q.mutex.Lock()
count = max(0, q.clientIPs[clientIP])
q.mutex.Unlock()
return
}
func (q *TaskQueue) ConfirmTaskCompletion(clientIP string, hash string) {
q.mutex.Lock()
if q.clientIPs[clientIP] > 0 {
q.clientIPs[clientIP]--
}
delete(q.hashSet, hash)
q.mutex.Unlock()
}