99*/
1010package org .opensearch .security ;
1111
12+ import java .io .BufferedReader ;
1213import java .io .IOException ;
14+ import java .io .InputStreamReader ;
1315import java .util .List ;
1416import java .util .Map ;
1517import java .util .concurrent .ExecutionException ;
7072import org .opensearch .client .Client ;
7173import org .opensearch .client .ClusterAdminClient ;
7274import org .opensearch .client .IndicesAdminClient ;
75+ import org .opensearch .client .Request ;
76+ import org .opensearch .client .Response ;
7377import org .opensearch .client .RestHighLevelClient ;
7478import org .opensearch .client .core .CountRequest ;
7579import org .opensearch .client .indices .CloseIndexRequest ;
96100import org .opensearch .repositories .RepositoryMissingException ;
97101import org .opensearch .core .rest .RestStatus ;
98102import org .opensearch .search .builder .SearchSourceBuilder ;
103+ import org .opensearch .security .auditlog .AuditLog ;
99104import org .opensearch .test .framework .AuditCompliance ;
100105import org .opensearch .test .framework .AuditConfiguration ;
101106import org .opensearch .test .framework .AuditFilters ;
@@ -1116,6 +1121,37 @@ public void shouldUpdateDocumentsInBulk_negative() throws IOException {
11161121
11171122 @ Test
11181123 public void shouldDeleteDocumentInBulk_positive () throws IOException {
1124+ /**
1125+ Proof of concept changes to make tests stable. The problem was caused by nodes rotation in this method:
1126+ {@link org.opensearch.client.RestClient#performRequest(Request request)} which uses {@link org.opensearch.client.RestClient#nextNodes()}
1127+ to iterate through nodes, in each http call. This would cause different amount of audit log messages depending on node hit by http request.
1128+ the process is:
1129+ 1. create 5-node cluster, with 3 cluster managers
1130+ 2. create 2-shard, 2-replica index, find out which node holds the data
1131+ 3. create second high level rest client, using node found in previous step, for all calls. Minimize transport requests between cluster nodes.
1132+ */
1133+ //create index
1134+ Settings sourceIndexSettings = Settings .builder ()
1135+ .put ("index.number_of_replicas" , 2 )
1136+ .put ("index.number_of_shards" , 2 )
1137+ .build ();
1138+ IndexOperationsHelper .createIndex (cluster , WRITE_SONG_INDEX_NAME , sourceIndexSettings );
1139+ try (RestHighLevelClient restHighLevelClient = cluster .getRestHighLevelClient (ADMIN_USER )) {
1140+ // cat shards, get index and node name
1141+ // getNodeByName()
1142+ // get rest client of node
1143+ Request getIndicesRequest = new Request ("GET" , "/_cat/shards?v" );
1144+ // High level client doesn't support _cat/shards API
1145+ Response getIndicesResponse = restHighLevelClient .getLowLevelClient ().performRequest (getIndicesRequest );
1146+ String primaryNode = new BufferedReader (new InputStreamReader (getIndicesResponse .getEntity ().getContent ())).lines ()
1147+ .map (s -> s .split ("\\ s+" ))
1148+ .filter (strings -> strings [0 ].equals (WRITE_SONG_INDEX_NAME ))
1149+ .filter (strings -> strings [2 ].equals ("p" ))
1150+ .map (strings -> strings [7 ])
1151+ .findAny ().orElseThrow (() -> new IllegalStateException ("no primary shard found" ));
1152+ cluster .setPrimaryNode (primaryNode );
1153+ }
1154+
11191155 try (RestHighLevelClient restHighLevelClient = cluster .getRestHighLevelClient (LIMITED_WRITE_USER )) {
11201156 BulkRequest bulkRequest = new BulkRequest ().setRefreshPolicy (IMMEDIATE );
11211157 bulkRequest .add (new IndexRequest (WRITE_SONG_INDEX_NAME ).id ("one" ).source (SONGS [0 ].asMap ()));
@@ -1140,9 +1176,71 @@ public void shouldDeleteDocumentInBulk_positive() throws IOException {
11401176 }
11411177 auditLogsRule .assertExactly (2 , userAuthenticated (LIMITED_WRITE_USER ).withRestRequest (POST , "/_bulk" ));
11421178 auditLogsRule .assertExactly (2 , grantedPrivilege (LIMITED_WRITE_USER , "BulkRequest" ));
1143- auditLogsRule .assertExactly (2 , grantedPrivilege (LIMITED_WRITE_USER , "CreateIndexRequest" ));
11441179 auditLogsRule .assertExactly (4 , grantedPrivilege (LIMITED_WRITE_USER , "PutMappingRequest" ));
1145- auditLogsRule .assertExactly (6 , auditPredicate (INDEX_EVENT ).withEffectiveUser (LIMITED_WRITE_USER ));
1180+ auditLogsRule .assertExactly (4 , auditPredicate (INDEX_EVENT ).withEffectiveUser (LIMITED_WRITE_USER ));
1181+ auditLogsRule .assertExactly (13 , auditPredicate (null ).withLayer (AuditLog .Origin .TRANSPORT ));
1182+ }
1183+
1184+ @ Test
1185+ public void shouldDeleteDocumentInBulk_positiveTransportRequests () throws IOException {
1186+ /**
1187+ Proof of concept changes to make tests stable. The problem was caused by nodes rotation in this method:
1188+ {@link org.opensearch.client.RestClient#performRequest(Request request)} which uses {@link org.opensearch.client.RestClient#nextNodes()}
1189+ to iterate through nodes, in each http call. This would cause different amount of audit log messages depending on node hit by http request.
1190+ the process is:
1191+ 1. create 5-node cluster, with 3 cluster managers
1192+ 2. create 2-shard, 2-replica index, find out which node is manager, doesn't hold data
1193+ 3. create second high level rest client, using node found in previous step, for all calls. Maximize transport requests between cluster nodes.
1194+ */
1195+ //create index
1196+ Settings sourceIndexSettings = Settings .builder ()
1197+ .put ("index.number_of_replicas" , 2 )
1198+ .put ("index.number_of_shards" , 2 )
1199+ .build ();
1200+ IndexOperationsHelper .createIndex (cluster , WRITE_SONG_INDEX_NAME , sourceIndexSettings );
1201+ try (RestHighLevelClient restHighLevelClient = cluster .getRestHighLevelClient (LIMITED_WRITE_USER )) {
1202+ // cat shards, get index and node name
1203+ // getNodeByName()
1204+ // get rest client of node
1205+ Request getIndicesRequest = new Request ("GET" , "/_cat/nodes?v" );
1206+ // High level client doesn't support _cat/shards API
1207+ Response getIndicesResponse = restHighLevelClient .getLowLevelClient ().performRequest (getIndicesRequest );
1208+ String primaryNode = new BufferedReader (new InputStreamReader (getIndicesResponse .getEntity ().getContent ())).lines ()
1209+ .map (s -> s .split ("\\ s+" ))
1210+ .filter (strings -> strings [9 ].equals ("*" ))
1211+ .map (strings -> strings [10 ])
1212+ .findAny ().orElseThrow (() -> new IllegalStateException ("no cluster manager found" ));
1213+ cluster .setPrimaryNode (primaryNode );
1214+ }
1215+
1216+ try (RestHighLevelClient restHighLevelClient = cluster .getRestHighLevelClient (LIMITED_WRITE_USER )) {
1217+
1218+ BulkRequest bulkRequest = new BulkRequest ().setRefreshPolicy (IMMEDIATE );
1219+ bulkRequest .add (new IndexRequest (WRITE_SONG_INDEX_NAME ).id ("one" ).source (SONGS [0 ].asMap ()));
1220+ bulkRequest .add (new IndexRequest (WRITE_SONG_INDEX_NAME ).id ("two" ).source (SONGS [1 ].asMap ()));
1221+ bulkRequest .add (new IndexRequest (WRITE_SONG_INDEX_NAME ).id ("three" ).source (SONGS [2 ].asMap ()));
1222+ bulkRequest .add (new IndexRequest (WRITE_SONG_INDEX_NAME ).id ("four" ).source (SONGS [3 ].asMap ()));
1223+ assertThat (restHighLevelClient .bulk (bulkRequest , DEFAULT ), successBulkResponse ());
1224+ bulkRequest = new BulkRequest ().setRefreshPolicy (IMMEDIATE );
1225+ bulkRequest .add (new DeleteRequest (WRITE_SONG_INDEX_NAME , "one" ));
1226+ bulkRequest .add (new DeleteRequest (WRITE_SONG_INDEX_NAME , "three" ));
1227+
1228+ BulkResponse response = restHighLevelClient .bulk (bulkRequest , DEFAULT );
1229+
1230+ assertThat (response , successBulkResponse ());
1231+ assertThat (internalClient , not (clusterContainsDocument (WRITE_SONG_INDEX_NAME , "one" )));
1232+ assertThat (internalClient , not (clusterContainsDocument (WRITE_SONG_INDEX_NAME , "three" )));
1233+ assertThat (
1234+ internalClient ,
1235+ clusterContainsDocumentWithFieldValue (WRITE_SONG_INDEX_NAME , "two" , FIELD_TITLE , TITLE_SONG_1_PLUS_1 )
1236+ );
1237+ assertThat (internalClient , clusterContainsDocumentWithFieldValue (WRITE_SONG_INDEX_NAME , "four" , FIELD_TITLE , TITLE_POISON ));
1238+ }
1239+ auditLogsRule .assertExactly (2 , userAuthenticated (LIMITED_WRITE_USER ).withRestRequest (POST , "/_bulk" ));
1240+ auditLogsRule .assertExactly (2 , grantedPrivilege (LIMITED_WRITE_USER , "BulkRequest" ));
1241+ auditLogsRule .assertExactly (4 , grantedPrivilege (LIMITED_WRITE_USER , "PutMappingRequest" ));
1242+ auditLogsRule .assertExactly (4 , auditPredicate (INDEX_EVENT ).withEffectiveUser (LIMITED_WRITE_USER ));
1243+ auditLogsRule .assertExactly (14 , auditPredicate (null ).withLayer (AuditLog .Origin .TRANSPORT ));
11461244 }
11471245
11481246 @ Test
0 commit comments