Skip to content

Commit d6e147c

Browse files
[9.4] Improve server error handling in Apache Arrow helper (#3276) (#3277)
Co-authored-by: Josh Mock <joshua.mock@elastic.co>
1 parent 18b24f0 commit d6e147c

4 files changed

Lines changed: 153 additions & 4 deletions

File tree

src/helpers.ts

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1024,13 +1024,34 @@ export default class Helpers {
10241024
if (metaHeader !== null) {
10251025
reqOptions.headers = reqOptions.headers ?? {}
10261026
reqOptions.headers['x-elastic-client-meta'] = `${metaHeader as string},h=qa`
1027-
reqOptions.asStream = true
10281027
}
10291028

10301029
params.format = 'arrow'
1030+
reqOptions.asStream = true
1031+
reqOptions.meta = true
1032+
1033+
const result = await client.esql.query(params, reqOptions) as unknown as TransportResult<Readable, unknown>
1034+
if (result.statusCode >= 400) {
1035+
const chunks: Buffer[] = []
1036+
if (Buffer.isBuffer(result.body)) {
1037+
chunks.push(result.body)
1038+
} else {
1039+
for await (const chunk of result.body) {
1040+
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk))
1041+
}
1042+
}
1043+
const body = Buffer.concat(chunks)
1044+
let parsedBody: any
1045+
try {
1046+
parsedBody = JSON.parse(body.toString())
1047+
} catch {
1048+
parsedBody = body.toString()
1049+
}
1050+
result.body = parsedBody
1051+
throw new ResponseError(result as any)
1052+
}
10311053

1032-
const response = await client.esql.query(params, reqOptions) as unknown as Readable
1033-
return await AsyncRecordBatchStreamReader.from(response)
1054+
return await AsyncRecordBatchStreamReader.from(result.body)
10341055
}
10351056
}
10361057

test/unit/helpers/bulk.test.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -904,6 +904,32 @@ test('bulk index', t => {
904904
t.ok(called, 'async onFlush should be awaited')
905905
})
906906

907+
t.test('onFlush error causes bulk to reject', async t => {
908+
const MockConnection = connection.buildMockConnection({
909+
onRequest () {
910+
return { body: { errors: false, items: [{}] } }
911+
}
912+
})
913+
914+
const client = new Client({
915+
node: 'http://localhost:9200',
916+
Connection: MockConnection
917+
})
918+
919+
await t.rejects(
920+
client.helpers.bulk({
921+
datasource: dataset.slice(),
922+
onDocument (doc) {
923+
return { index: { _index: 'test' } }
924+
},
925+
onFlush () {
926+
return Promise.reject(new Error('onFlush failed'))
927+
}
928+
}),
929+
{ message: 'onFlush failed' }
930+
)
931+
})
932+
907933
t.end()
908934
})
909935

test/unit/helpers/esql.test.ts

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import { test } from 'tap'
77
import * as arrow from 'apache-arrow'
88
import { connection } from '../../utils'
9-
import { Client } from '../../../'
9+
import { Client, errors } from '../../../'
1010

1111
test('ES|QL helper', t => {
1212
test('toRecords', t => {
@@ -312,6 +312,77 @@ test('ES|QL helper', t => {
312312
t.end()
313313
})
314314

315+
t.test('Throws ResponseError when server returns a JSON error', async t => {
316+
const errorBody = {
317+
error: {
318+
root_cause: [{
319+
type: 'illegal_argument_exception',
320+
reason: 'ES|QL type [date_range] is not supported by the Arrow format'
321+
}],
322+
type: 'illegal_argument_exception',
323+
reason: 'ES|QL type [date_range] is not supported by the Arrow format'
324+
},
325+
status: 400
326+
}
327+
328+
const MockConnection = connection.buildMockConnection({
329+
onRequest (_params) {
330+
return {
331+
body: JSON.stringify(errorBody),
332+
statusCode: 400,
333+
headers: {
334+
'content-type': 'application/vnd.elasticsearch+json;compatible-with=9'
335+
}
336+
}
337+
}
338+
})
339+
340+
const client = new Client({
341+
node: 'http://localhost:9200',
342+
Connection: MockConnection
343+
})
344+
345+
try {
346+
await client.helpers.esql({ query: 'FROM test-range | LIMIT 1' }).toArrowReader()
347+
t.fail('Should throw')
348+
} catch (err: any) {
349+
t.ok(err instanceof errors.ResponseError)
350+
t.equal(err.statusCode, 400)
351+
t.equal(err.message, 'illegal_argument_exception\n\tRoot causes:\n\t\tillegal_argument_exception: ES|QL type [date_range] is not supported by the Arrow format')
352+
}
353+
354+
t.end()
355+
})
356+
357+
t.test('Throws ResponseError with non-JSON error body', async t => {
358+
const MockConnection = connection.buildMockConnection({
359+
onRequest (_params) {
360+
return {
361+
body: 'internal server error',
362+
statusCode: 500,
363+
headers: {
364+
'content-type': 'text/plain'
365+
}
366+
}
367+
}
368+
})
369+
370+
const client = new Client({
371+
node: 'http://localhost:9200',
372+
Connection: MockConnection
373+
})
374+
375+
try {
376+
await client.helpers.esql({ query: 'FROM test | LIMIT 1' }).toArrowReader()
377+
t.fail('Should throw')
378+
} catch (err: any) {
379+
t.ok(err instanceof errors.ResponseError)
380+
t.equal(err.statusCode, 500)
381+
}
382+
383+
t.end()
384+
})
385+
315386
t.end()
316387
})
317388

test/unit/helpers/msearch.test.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -758,3 +758,34 @@ test('Should use req options', async t => {
758758

759759
t.teardown(() => m.stop())
760760
})
761+
762+
test('documents returns empty array when hits is absent', async t => {
763+
const MockConnection = connection.buildMockConnection({
764+
onRequest (_params) {
765+
return {
766+
body: {
767+
responses: [{
768+
status: 200,
769+
aggregations: { count: { value: 5 } }
770+
}]
771+
}
772+
}
773+
}
774+
})
775+
776+
const client = new Client({
777+
node: 'http://localhost:9200',
778+
Connection: MockConnection
779+
})
780+
781+
const m = client.helpers.msearch({ operations: 1 })
782+
783+
const result = await m.search(
784+
{ index: 'test' },
785+
{ query: { match_all: {} } }
786+
)
787+
788+
t.same(result.documents, [])
789+
790+
t.teardown(() => m.stop())
791+
})

0 commit comments

Comments
 (0)