Skip to content
This repository was archived by the owner on Dec 10, 2018. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Procfile

This file was deleted.

2 changes: 0 additions & 2 deletions Procfile.dev

This file was deleted.

38 changes: 36 additions & 2 deletions frontend/spec/mws/Cursor.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ describe('A Cursor', function () {
instance = new mongo.Cursor(coll);

result = {
result: ['test', 'results', 'here']
result: ['test', 'results', 'here'],
count: 3,
cursor_id: 1234
};
makeRequest = spyOn(mongo.request, 'makeRequest').andCallFake(function () {
var success = arguments[5];
Expand Down Expand Up @@ -86,6 +88,10 @@ describe('A Cursor', function () {
});
});

it('will disallow batchSize', function () {
expect(instance.batchSize).toThrow(new Error('batchSize() is disallowed in the web shell'));
});

describe('depending on query state', function () {
var callbackSpy;

Expand All @@ -109,6 +115,17 @@ describe('A Cursor', function () {
var params = makeRequest.mostRecentCall.args[1];
expect(params.query).toEqual(query);
expect(params.projection).toEqual(projection);
expect(params.drain_cursor).toBeUndefined();

instance = new mongo.Cursor(coll, query, projection);
result.count = 4;
result.cursor_id = 1234;
instance._executeQuery();
instance._executeQuery(null, true, true);
params = makeRequest.mostRecentCall.args[1];
expect(params.cursor_id).toEqual(1234);
expect(params.retrieved).toEqual(result.result.length);
expect(params.count).toEqual(4);

instance = new mongo.Cursor(coll, query);
instance._executeQuery();
Expand All @@ -121,6 +138,11 @@ describe('A Cursor', function () {
params = makeRequest.mostRecentCall.args[1];
expect(params.query).toBeUndefined();
expect(params.projection).toBeUndefined();

instance = new mongo.Cursor(coll);
instance._executeQuery(null, true, true);
params = makeRequest.mostRecentCall.args[1];
expect(params.drain_cursor).toBe(true);
});

it('will skip results', function () {
Expand Down Expand Up @@ -167,6 +189,7 @@ describe('A Cursor', function () {

async = false;
instance._executed = false;
instance._result = [];
instance._executeQuery(null, async);
expect(instance._executed).toBe(true);
expect(makeRequest.calls[1].args[6]).toBe(async);
Expand Down Expand Up @@ -237,13 +260,24 @@ describe('A Cursor', function () {
instance._executed = true;
});

it('does not re-execute and calls the on success callback', function () {
it('does not re-execute and calls the on success callback if it has all results', function () {
instance._count = 1;
instance._retrieved = 1;
instance._executeQuery(callbackSpy);
expect(instance._executed).toBe(true);
expect(makeRequest).not.toHaveBeenCalled();
expect(callbackSpy).toHaveBeenCalled();
});

it('re-executes and calls the on success callback if it does not have all results', function () {
instance._count = 10;
instance._retrieved = 1;
instance._executeQuery(callbackSpy);
expect(instance._executed).toBe(true);
expect(makeRequest).toHaveBeenCalled();
expect(callbackSpy).toHaveBeenCalled();
});

it('warns the user and returns true', function () {
var actual = instance._warnIfExecuted('methodName');
expect(actual).toBe(true);
Expand Down
29 changes: 23 additions & 6 deletions frontend/src/mws/Cursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ mongo.Cursor = function (collection, query, projection) {
this._query = query || null;
this._fields = projection || null;
this._executed = false;
this._cursorId = 0;
this._count = 0; // total docs to return with skip and limit
this._retrieved = 0;
this._result = [];
console.debug('Created mongo.Cursor:', this);
};
Expand All @@ -41,11 +44,14 @@ mongo.Cursor = function (collection, query, projection) {
* methods such as sort() and enabling result set iteration methods such as
* next(). Will execute onSuccess on query success, or instantly if the query
* was previously successful. onSuccess will be called asynchronously by
* default, or synchronously if given false for the async parameter.
* default, or synchronously if given false for the async parameter. drainCursor
* specifies whether the all remaining results on cursor should be returned.
*/
mongo.Cursor.prototype._executeQuery = function (onSuccess, async) {
mongo.Cursor.prototype._executeQuery = function (onSuccess, async, drainCursor) {
async = typeof async !== 'undefined' ? async : true;
if (!this._executed) {
drainCursor = typeof drainCursor !== 'undefined' ? drainCursor : false;
if ((!this._executed || this._retrieved < this._count) &&
(this._result.length === 0 || drainCursor)) {
console.debug('Executing query:', this);

var url = this._coll.urlBase + 'find';
Expand All @@ -54,10 +60,17 @@ mongo.Cursor.prototype._executeQuery = function (onSuccess, async) {
if (this._fields) { params.projection = this._fields; }
if (this._skip) { params.skip = this._skip; }
if (this._limit) { params.limit = this._limit; }
if (this._sort) {params.sort = this._sort; }
if (this._sort) { params.sort = this._sort; }
if (this._cursorId) { params.cursor_id = this._cursorId; }
if (this._retrieved) { params.retrieved = this._retrieved; }
if (this._count) { params.count = this._count; }
if (drainCursor) { params.drain_cursor = drainCursor; }
var wrappedSuccess = function (data) {
mongo.events.callbackTrigger(this._shell, 'cursor.execute', data.result.slice());
this._storeQueryResult(data.result);
this._cursorId = data.cursor_id || this._cursorId;
this._count = data.count || this._count;
this._retrieved += data.result.length;
if (onSuccess) {
onSuccess();
}
Expand Down Expand Up @@ -189,6 +202,10 @@ mongo.Cursor.prototype.limit = function (limit) {
return this;
};

mongo.Cursor.prototype.batchSize = function () {
throw new Error('batchSize() is disallowed in the web shell');
};

mongo.Cursor.prototype.toArray = function (callback) {
var context = this._shell.evaluator.pause();
if (this._arr) {
Expand All @@ -215,7 +232,7 @@ mongo.Cursor.prototype.toArray = function (callback) {
if (callback) {
callback(this._arr);
}
}.bind(this));
}.bind(this), true, true);
};

mongo.Cursor.prototype.count = function (useSkipLimit) {
Expand Down Expand Up @@ -250,4 +267,4 @@ mongo.Cursor.prototype.__methodMissing = function (field) {
this._shell.evaluator.resume(context, arr[field]);
}.bind(this));
}
};
};
7 changes: 1 addition & 6 deletions run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,4 @@

if __name__ == '__main__':
result = nose.runmodule(name='tests', argv=[
'',
'-s',
'--verbose',
'--logging-level=INFO',
'--rednose',
])
'', '-s', '--verbose', '--logging-level=INFO', '--rednose', ])
119 changes: 115 additions & 4 deletions tests/test_mws_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
from webapps.lib.util import get_internal_coll_name, get_collection_names
from flask import session

from pymongo.cursor import Cursor
from pymongo.errors import OperationFailure
from webapps.lib.MWSServerError import MWSServerError

from tests import MongoWSTestCase
from webapps.lib import CLIENTS_COLLECTION



class ViewsSetUpUnitTestCase(MongoWSTestCase):
def test_create_mws_resource(self):
url = '/mws/'
Expand Down Expand Up @@ -186,7 +186,6 @@ class DBCollectionTestCase(DBTestCase):
def setUp(self):
super(DBCollectionTestCase, self).setUp()


self.coll_name = 'test_collection'
self.internal_coll_name = get_internal_coll_name(self.res_id,
self.coll_name)
Expand All @@ -201,12 +200,17 @@ def tearDown(self):
self.db_collection.drop()

def make_find_request(self, query=None, projection=None, skip=None,
limit=None, expected_status=200):
limit=None, expected_status=200, cursor_id=0,
retrieved=0, count=0, drain_cursor=False):
data = {
'query': query,
'projection': projection,
'skip': skip,
'limit': limit,
'cursor_id': cursor_id,
'retrieved': retrieved,
'count': count,
'drain_cursor': drain_cursor
}
return self._make_request('find', data, self.app.get,
expected_status)
Expand Down Expand Up @@ -252,14 +256,121 @@ def set_session_id(self, new_id):


class FindUnitTestCase(DBCollectionTestCase):
def ensure_cursor_death(self, collection, cursor_id, retrieved):
batch_size = self.real_app.config['CURSOR_BATCH_SIZE']
cursor = Cursor(collection, _cursor_id=cursor_id,
limit=batch_size, _retrieved=retrieved)
try:
cursor.next()
except StopIteration:
pass
except OperationFailure:
pass
else:
self.fail('Cursor was not killed')

def verify_cursor(self, num_docs, **kwargs):
self.db_collection.drop()

docs = [{'val': i} for i in xrange(num_docs)]
total_received = 0

self.db_collection.insert(docs)

query = {}
response = self.make_find_request(query=query, **kwargs)
count = response['count']

if kwargs.get('limit') is not None:
self.assertEqual(count, kwargs['limit'])
else:
self.assertEqual(count, num_docs)

expected = kwargs['limit'] if kwargs.get('limit') else num_docs

if kwargs.get('drain_cursor'):
self.assertEqual(len(response['result']), count)
total_received += len(response['result'])

while total_received != expected:
values = [r['val'] for r in response['result']]
cursor_id = response['cursor_id']
retrieved = len(response['result'])

self.assertItemsEqual(
values, range(total_received, total_received+retrieved))

total_received += retrieved
if total_received == expected:
break
response = self.make_find_request(query=query, cursor_id=cursor_id,
retrieved=total_received,
count=count, **kwargs)

self.ensure_cursor_death(self.db_collection,
long(response['cursor_id']),
retrieved=total_received)

def test_find(self):
query = {'name': 'mongo'}
self.db_collection.insert(query)

result = self.make_find_request(query)
self.assertEqual(len(result), 1)
self.assertEqual(len(result), 3)
self.assertEqual(result['count'], 1)
self.assertEqual(result['cursor_id'], '0')
self.assertEqual(len(result['result']), 1)
self.assertEqual(result['result'][0]['name'], 'mongo')

def test_cursor(self):
batch_size = self.real_app.config['CURSOR_BATCH_SIZE']

self.verify_cursor(83)
self.verify_cursor(100)
self.verify_cursor(250)
self.verify_cursor(batch_size)
self.verify_cursor(batch_size+1)
self.verify_cursor(batch_size-1)

def test_invalid_cursor(self):
query = {}
invalid_cursor = 1234
docs = [{'val': i} for i in xrange(21)]
self.db_collection.insert(docs)

response = self.make_find_request(query=query)
count = response['count']
retrieved = len(response['result'])

self.make_find_request(query=query, cursor_id=invalid_cursor,
retrieved=retrieved, count=count,
expected_status=400)

def test_cursor_with_limit(self):
batch_size = self.real_app.config['CURSOR_BATCH_SIZE']

self.verify_cursor(100, limit=83)
self.verify_cursor(100, limit=100)
self.verify_cursor(100, limit=batch_size)
self.verify_cursor(100, limit=batch_size+1)
self.verify_cursor(100, limit=batch_size-1)

def test_cursor_drain(self):
batch_size = self.real_app.config['CURSOR_BATCH_SIZE']

self.verify_cursor(100, drain_cursor=True)
self.verify_cursor(batch_size, drain_cursor=True)
self.verify_cursor(batch_size+1, drain_cursor=True)
self.verify_cursor(batch_size-1, drain_cursor=True)

def test_cursor_drain_with_limit(self):
batch_size = self.real_app.config['CURSOR_BATCH_SIZE']

self.verify_cursor(100, limit=100, drain_cursor=True)
self.verify_cursor(100, limit=batch_size, drain_cursor=True)
self.verify_cursor(100, limit=batch_size+1, drain_cursor=True)
self.verify_cursor(100, limit=batch_size-1, drain_cursor=True)

def test_skipping_results(self):
self.db_collection.insert([{'val': i} for i in xrange(10)])

Expand Down
2 changes: 2 additions & 0 deletions webapps/configs/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,5 @@
# 0: user is unable to create additional collections
# 1+: user may have up to # collections per res_id
QUOTA_NUM_COLLECTIONS = 8

CURSOR_BATCH_SIZE = 20 # default max docs to return for a query
28 changes: 28 additions & 0 deletions webapps/lib/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from flask import current_app
import pymongo
from pymongo.cursor_manager import CursorManager

from MWSServerError import MWSServerError

Expand All @@ -25,6 +26,33 @@
db = None


class KeepAliveCursorManager(CursorManager):
"""A cursor manager that does not kill cursors
"""
def close(self, cursor_id):
# refuse to kill the cursor
pass


def get_keepalive_db(MWSExceptions=True):
config = current_app.config
try:
client = pymongo.MongoClient(
config.get('DB_HOST'),
config.get('DB_PORT'))
client.set_cursor_manager(KeepAliveCursorManager)
db = client[config.get('DB_NAME')]
if 'username' in config:
db.authenticate(config.get('username'), config.get('password'))
return db
except Exception as e:
if MWSExceptions:
debug = config['DEBUG']
msg = str(e) if debug else 'An unexpected error occurred.'
raise MWSServerError(500, msg)
raise


def get_db(MWSExceptions=True):
global db
config = current_app.config
Expand Down
Loading