Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
# Created by `dart pub`
.dart_tool/
example/pubspec.lock
pubspec_overrides.yaml
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 0.3.2

- Fix isolates handling

## 0.3.1

- Fix example in readme
Expand Down
16 changes: 14 additions & 2 deletions example/utopia_queue_example.dart
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
import 'dart:io';

import 'package:utopia_queue/utopia_queue.dart';

void main(List<String> arguments) async {
final connection = await ConnectionRedis.init('localhost', 6379);
final Server server = Server(connection, queue: 'myqueue');

server.job().inject('message').action((Message message) {
server.setResource('res1', () {
return 'hello res 1';
});

server
.job()
.inject('message')
.inject('res1')
.action((Message message, String res1) {
print('res1: $res1');
sleep(Duration(seconds: 2));
print(message.toMap());
});
server.start(threads: 2);
server.start(threads: 3);
}
25 changes: 25 additions & 0 deletions lib/src/isolate_message.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import 'dart:isolate';

import 'package:utopia_di/utopia_di.dart';

import 'job.dart';

class IsolateMessage {
final int id;
final SendPort sendPort;
final List<Hook> errors;
final List<Hook> init;
final List<Hook> shutdown;
final DI di;
final Job job;

IsolateMessage({
required this.id,
required this.sendPort,
required this.errors,
required this.init,
required this.shutdown,
required this.di,
required this.job,
});
}
56 changes: 56 additions & 0 deletions lib/src/isolate_supervisor.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import 'dart:developer' as dev;
import 'dart:isolate';

import '../utopia_queue.dart';

enum IsolateStatus {
working,
idle,
paused,
stopped,
}

class IsolateSupervisor {
final Isolate isolate;
final ReceivePort receivePort;
final int id;
SendPort? isolateSendPort;
Function(Message) onError;
IsolateStatus _status = IsolateStatus.paused;

bool get isBusy => _status == IsolateStatus.working;

static const String messageClose = '_CLOSE';

IsolateSupervisor({
required this.isolate,
required this.receivePort,
required this.id,
required this.onError,
});

void resume() {
receivePort.listen(listener);
isolate.resume(isolate.pauseCapability!);
_status = IsolateStatus.idle;
}

void stop() {
dev.log('Stopping isolate $id', name: 'FINE');
isolateSendPort?.send(messageClose);
_status = IsolateStatus.stopped;
receivePort.close();
}

void listener(dynamic message) async {
if (message is SendPort) {
isolateSendPort = message;
} else if (message is Map) {
if (message['type'] == 'error') {
onError.call(message['message']);
} else if (message['type'] == 'status') {
_status = message['status'];
}
}
}
}
175 changes: 130 additions & 45 deletions lib/src/server.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import 'dart:developer' as dev;
import 'dart:isolate' as iso;

import 'package:utopia_di/utopia_di.dart';
import 'package:utopia_queue/src/isolate_message.dart';
import 'package:utopia_queue/src/isolate_supervisor.dart';
import 'package:utopia_queue/src/message.dart';

import 'connection.dart';
Expand All @@ -17,7 +21,7 @@ class Server {
final List<Hook> _errors = [];
final List<Hook> _init = [];
final List<Hook> _shutdown = [];
static Map<String, int> threads = {};
final List<IsolateSupervisor> _supervisors = [];

Job _job = Job();

Expand Down Expand Up @@ -74,10 +78,20 @@ class Server {
return hook;
}

Future<void> _onIsolateMain((Connection, int) args) async {
final (connection, id) = args;
print('Server $id waiting for queue');
Future<void> _watchQueue() async {
while (true) {
IsolateSupervisor? worker;
for (final sup in _supervisors) {
if (sup.isBusy) {
continue;
}
worker = sup;
break;
}

if (worker == null) {
continue;
}
var nextMessage =
await connection.rightPopJson('$namespace.queue.$queue', 5);

Expand All @@ -87,56 +101,63 @@ class Server {

final message = Message.fromMap(nextMessage);
setResource('message', () => message);
print('$id: Job received ${message.pid}');

try {
final groups = _job.getGroups();
if (_job.hook) {
await _executeHooks(
_init,
groups,
(hook) => _getArguments(hook, message.payload),
globalHook: true,
);
}
final args = _getArguments(_job, message.payload);
await Function.apply(
_job.getAction(), [..._job.argsOrder.map((key) => args[key])]);
if (_job.hook) {
await _executeHooks(
_shutdown,
groups,
(hook) => _getArguments(hook, message.payload),
globalHook: true,
);
}
print('$id: Job ${message.pid} successfully run');
} catch (e) {
await connection.leftPush('$namespace.failed.$queue', message.pid);
print('$id: Error: Job ${message.pid} failed to run');
print('$id: Error: ${e.toString()}');
setResource('error', () => e);
_executeHooks(
_errors,
[],
(hook) => _getArguments(hook, message.payload),
);
}
worker.isolateSendPort?.send(message);
}
}

Future<void> _spawnOffIsolates(int num) async {
void _onError(Message message) {
connection.leftPushJson('$namespace.failed.$queue', message.toMap());
}

Future<void> _spawn(int num) async {
_supervisors.clear();
for (var i = 0; i < num; i++) {
await iso.Isolate.spawn<(Connection, int)>(
_onIsolateMain, (connection, i));
final receivePort = iso.ReceivePort();
final isolate = await iso.Isolate.spawn<IsolateMessage>(
_entrypoint,
IsolateMessage(
id: i,
sendPort: receivePort.sendPort,
errors: _errors,
job: _job,
init: _init,
shutdown: _shutdown,
di: di,
),
paused: true,
);
final sup = IsolateSupervisor(
isolate: isolate, receivePort: receivePort, id: i, onError: _onError);
_supervisors.add(sup);
sup.resume();
}
}

/// Start queue server
Future<void> start({int threads = 1}) async {
iso.ReceivePort();
await _spawnOffIsolates(threads);
await _spawn(threads);
await _watchQueue();
}
}

class _IsolateServer {
final Job job;
final DI di;
final List<Hook> errors;
final List<Hook> init;
final List<Hook> shutdown;
final iso.SendPort sendPort;
final int id;

_IsolateServer({
required this.id,
required this.job,
required this.di,
required this.errors,
required this.init,
required this.shutdown,
required this.sendPort,
});

Map<String, dynamic> _getArguments(
Hook hook,
Expand Down Expand Up @@ -192,7 +213,7 @@ class Server {

void executeGroupHooks() {
for (final group in groups) {
for (final hook in _init) {
for (final hook in init) {
if (hook.getGroups().contains(group)) {
final arguments = argsCallback.call(hook);
Function.apply(
Expand All @@ -212,4 +233,68 @@ class Server {
executeGlobalHook();
}
}

Future<void> execute(Message message) async {
dev.log('$id: Job received ${message.pid}');
di.set('message', () => message);
sendPort.send({'type': 'status', 'status': IsolateStatus.working});

try {
final groups = job.getGroups();
if (job.hook) {
await _executeHooks(
init,
groups,
(hook) => _getArguments(hook, message.payload),
globalHook: true,
);
}
final args = _getArguments(job, message.payload);
await Function.apply(
job.getAction(), [...job.argsOrder.map((key) => args[key])]);
if (job.hook) {
await _executeHooks(
shutdown,
groups,
(hook) => _getArguments(hook, message.payload),
globalHook: true,
);
}
dev.log('$id: Job ${message.pid} successfully run');
} catch (e) {
sendPort.send({'type': 'error', 'message': message});
dev.log('$id: Error: Job ${message.pid} failed to run');
dev.log('$id: Error: ${e.toString()}');
_executeHooks(
errors,
[],
(hook) => _getArguments(hook, message.payload),
);
} finally {
sendPort.send({'type': 'status', 'status': IsolateStatus.idle});
}
}
}

Future<void> _entrypoint(IsolateMessage options) async {
final receivePort = iso.ReceivePort();
final server = _IsolateServer(
id: options.id,
job: options.job,
di: options.di,
errors: options.errors,
init: options.init,
shutdown: options.shutdown,
sendPort: options.sendPort,
);

options.sendPort.send(receivePort.sendPort);
dev.log('Worker: ${options.id} waiting fro job');
receivePort.listen((message) async {
if (message is Message) {
await server.execute(message);
} else if (message == IsolateSupervisor.messageClose) {
receivePort.close();
}
});
}
13 changes: 6 additions & 7 deletions pubspec.lock
Original file line number Diff line number Diff line change
Expand Up @@ -356,11 +356,10 @@ packages:
utopia_di:
dependency: "direct main"
description:
name: utopia_di
sha256: f4a618e0278346e08b6cec8c88ae0135e279b6f4aaeed97739302d014c4024df
url: "https://pub.dev"
source: hosted
version: "0.2.0"
path: "../utopia_di"
relative: true
source: path
version: "0.2.1"
uuid:
dependency: "direct main"
description:
Expand All @@ -373,10 +372,10 @@ packages:
dependency: transitive
description:
name: vm_service
sha256: e7d5ecd604e499358c5fe35ee828c0298a320d54455e791e9dcf73486bc8d9f0
sha256: a75f83f14ad81d5fe4b3319710b90dec37da0e22612326b696c9e1b8f34bbf48
url: "https://pub.dev"
source: hosted
version: "14.1.0"
version: "14.2.0"
watcher:
dependency: transitive
description:
Expand Down
4 changes: 2 additions & 2 deletions pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
name: utopia_queue
description: Light and easy to use queue library for Dart server projects
version: 0.3.1
version: 0.3.2
repository: https://github.com/utopia-dart/utopia_queue

environment:
sdk: '>=3.0.0 <4.0.0'

dependencies:
redis: ^4.0.0
utopia_di: ^0.2.0
utopia_di: ^0.2.1
uuid: ^4.3.3

dev_dependencies:
Expand Down