source: trunk/manage-us3-pipe.php@ 38

Last change on this file since 38 was 35, checked in by gegorbet, 7 years ago

mods mostly for use of mysqli

File size: 10.3 KB
RevLine 
[1]1<?php
2
[18]3$us3bin = exec( "ls -d ~us3/lims/bin" );
[6]4include "$us3bin/listen-config.php";
[25]5include "$class_dir/experiment_status.php";
[1]6
7write_log( "$self: Starting" );
8
9$handle = fopen( $pipe, "r+" );
10
11if ( $handle == NULL )
12{
13 write_log( "$self: Cannot open pipe" );
14 exit( -1 );
15}
16
17$msg = "";
18
19// From a pipe, we don't know when the message terminates, so the sender
20// added a null to indicate the end of each message
21do
22{
23 $input = fgetc( $handle ); // Read one character at a time
24 $msg .= $input;
25
26 if ( $input[ 0 ] == chr( 0 ) )
27 {
28 // Go do some work
29 $msg = rtrim( $msg );
30 if ( $msg == "Stop listen" ) break;
31 process( $msg );
32 write_log( "$self: $msg" );
33 $msg = "";
34 }
35} while ( true );
36
37write_log( "$self: Stopping" );
38exit();
39
40// The format of the messages would be
41// db-requestID: message ( colon-space )
42function process( $msg )
43{
44 global $dbhost;
45 global $user;
46 global $passwd;
[3]47 global $self;
[1]48
49 $list = explode( ": ", $msg );
50 list( $db, $requestID ) = explode( "-", array_shift( $list ) );
51 $message = implode( ": ", $list );
52
53 // Convert to integer
54 settype( $requestID, 'integer' );
55
56 // We need the gfacID
[35]57 $resource = mysqli_connect( $dbhost, $user, $passwd, $db );
[1]58
59 if ( ! $resource )
60 {
[35]61 write_log( "$self process(): Could not connect to MySQL - " . mysqli_error($resource) );
[1]62 write_log( "$self process(): original msg - $msg" );
63 return;
64 }
65
66 $query = "SELECT gfacID FROM HPCAnalysisResult " .
67 "WHERE HPCAnalysisRequestID=$requestID " .
68 "ORDER BY HPCAnalysisResultID DESC " .
69 "LIMIT 1";
70
[35]71 $result = mysqli_query( $resource, $query );
[1]72
73 if ( ! $result )
74 {
75 write_log( "$self process(): Bad query: $query" );
76 write_log( "$self process(): original msg - $msg" );
77 return;
78 }
79
[6]80 // Set flags for Airavata/Thrift and "Finished..."
[35]81 list( $gfacID ) = mysqli_fetch_row( $result );
82 mysqli_close( $resource );
[1]83
[10]84 $is_athrift = preg_match( "/^US3-A/i", $gfacID );
[6]85 $is_finished = preg_match( "/^Finished/i", $message );
[1]86
[6]87 if ( $is_athrift )
88 { // Process submitted thru Airavata/Thrift
89 if ( $is_finished )
90 { // Message is "Finished..." : Update message and status
91write_log( "$self process(): Thrift + Finished" );
92 update_db( $db, $requestID, 'finished', $message );
93 update_aira( $gfacID, $message ); // wait for Airvata to deposit data
94 }
95 else
96 { // Other messages : just update message
97//write_log( "$self process(): Thrift, NOT Finished" );
98 $updmsg = 'update';
99 if ( preg_match( "/^Starting/i", $message ) )
100 $updmsg = 'starting';
101 if ( preg_match( "/^Abort/i", $message ) )
102 $updmsg = 'aborted';
103
104 update_db( $db, $requestID, $updmsg, $message );
105 update_gfac( $gfacID, "UPDATING", $message );
106 }
[1]107 }
108
[6]109 else
110 { // Not Airavata/Thrift
111 if ( $is_finished )
112 { // Handle "Finished..." message
113 $hex = "[0-9a-fA-F]";
[1]114
[6]115 if ( preg_match( "/^US3-Experiment/i", $gfacID ) ||
116 preg_match( "/^US3-$hex{8}-$hex{4}-$hex{4}-$hex{4}-$hex{12}$/", $gfacID ) )
117 {
118 // Then it's a GFAC job
119 update_db( $db, $requestID, 'finished', $message );
120 update_gfac( $gfacID, "UPDATING", $message ); // wait for GFAC to deposit data
121 notify_gfac_done( $gfacID ); // notify them to go get it
122 }
123
124 else
125 {
126 // It's a local job
127 update_db( $db, $requestID, 'finished', $message );
128 update_gfac( $gfacID, "COMPLETE", $message ); // data should be there already
129 }
130 }
131
132 else if ( preg_match( "/^Starting/i", $message ) )
[1]133 {
[6]134 update_db( $db, $requestID, 'starting', $message );
135 update_gfac( $gfacID, "RUNNING", $message );
[1]136 }
137
[6]138 else if ( preg_match( "/^Abort/i", $message ) )
139 {
140 update_db( $db, $requestID, 'aborted', $message );
141 update_gfac( $gfacID, "CANCELED", $message );
142 }
143
[1]144 else
145 {
[6]146 update_db( $db, $requestID, 'update', $message );
147 update_gfac( $gfacID, "UPDATING", $message );
[1]148 }
149 }
150}
151
152function update_db( $db, $requestID, $action, $message )
153{
154 global $dbhost;
155 global $user;
156 global $passwd;
157 global $self;
158
[35]159 $resource = mysqli_connect( $dbhost, $user, $passwd, $db );
[1]160
161 if ( ! $resource )
162 {
[35]163 write_log( "$self: Could not connect to DB $db " . mysqli_error( $resource ) );
[1]164 return;
165 }
166
167 $query = "SELECT HPCAnalysisResultID FROM HPCAnalysisResult " .
168 "WHERE HPCAnalysisRequestID=$requestID " .
169 "ORDER BY HPCAnalysisResultID DESC " .
170 "LIMIT 1";
171
[35]172 $result = mysqli_query( $resource, $query );
[1]173
174 if ( ! $result )
175 {
176 write_log( "$self: Bad query: $query" );
177 return;
178 }
179
[35]180 list( $resultID ) = mysqli_fetch_row( $result );
[1]181
182 $query = "UPDATE HPCAnalysisResult SET ";
183
184 switch ( $action )
185 {
186 case "starting":
187 $query .= "queueStatus='running'," .
188 "startTime=now(), ";
189 break;
190
191 case "aborted":
192 $query .= "queueStatus='aborted'," .
193 "endTime=now(), ";
194 break;
195
196 case "finished":
197 $query .= "queueStatus='completed'," .
198 "endTime=now(), ";
[6]199//write_log( "$self process(): $requestID : dbupd : Finished" );
[1]200 break;
201
[6]202 case "update":
203//write_log( "$self process(): $requestID : dbupd : update" );
[1]204 default:
205 break;
206 }
207
[35]208 $query .= "lastMessage='" . mysqli_real_escape_string( $resource, $message ) . "'" .
[1]209 "WHERE HPCAnalysisResultID=$resultID";
210
[35]211 mysqli_query( $resource, $query );
212 mysqli_close( $resource );
[1]213}
214
215// Function to update the global database status
216function update_gfac( $gfacID, $status, $message )
217{
218 global $dbhost;
219 global $guser;
220 global $gpasswd;
221 global $gDB;
222 global $self;
223
224 $allowed_status = array( 'RUNNING',
225 'UPDATING',
226 'CANCELED',
227 'COMPLETE'
228 );
229
230 // Get data from global GFAC DB
[35]231 $gLink = mysqli_connect( $dbhost, $guser, $gpasswd, $gDB );
232 if ( ! $gLink )
[1]233 {
[35]234 write_log( "$self: Could not select DB $gDB " . mysqli_error( $gLink ) );
[1]235 return;
236 }
237
238 $status = strtoupper( $status );
239 if ( ! in_array( $status, $allowed_status ) )
240 {
241 write_log( "$self: update_gfac status $status not allowed" );
242 return;
243 }
244
245 // if 'UPDATING' then we're only updating the queue_messages table
[6]246 if ( $status == 'UPDATING' )
[1]247 {
[6]248 $query = "UPDATE analysis " .
[35]249 "SET queue_msg='" . mysqli_real_escape_string( $gLink, $message ) . "' " .
[1]250 "WHERE gfacID='$gfacID'";
251
[6]252//write_log( "$self process(): updgf-u : status=$status" );
[35]253 mysqli_query( $gLink, $query );
[1]254 }
255
[6]256 else
257 {
258 $query = "UPDATE analysis SET status='$status', " .
[35]259 "queue_msg='" . mysqli_real_escape_string( $gLink, $message ) . "' " .
[6]260 "WHERE gfacID='$gfacID'";
261
262//write_log( "$self process(): updgf-s : status=$status" );
[35]263 mysqli_query( $gLink, $query );
[6]264 }
265
[1]266 // Also update the queue_messages table
267 $query = "SELECT id FROM analysis " .
268 "WHERE gfacID = '$gfacID'";
[35]269 $result = mysqli_query( $gLink, $query );
[1]270 if ( ! $result )
271 {
[35]272 write_log( "$self: bad query: $query " . mysqli_error( $gLink ) );
[1]273 return;
274 }
275
[35]276 if ( mysqli_num_rows( $result ) == 0 )
[1]277 {
278 write_log( "$self: can't find $gfacID in GFAC db" );
279 return;
280 }
281
[35]282 list( $aID ) = mysqli_fetch_array( $result );
[1]283
284 $query = "INSERT INTO queue_messages " .
285 "SET analysisID = $aID, " .
[35]286 "message = '" . mysqli_real_escape_string( $gLink, $message ) . "'";
287 $result = mysqli_query( $gLink, $query );
[1]288 if ( ! $result )
289 {
[35]290 write_log( "$self: bad query: $query " . mysqli_error( $gLink ) );
[1]291 return;
292 }
293
[35]294 mysqli_close( $gLink );
[1]295}
296
297// function to notify GFAC that the UDP message "Finished" has arrived
298function notify_gfac_done( $gfacID )
299{
300 global $serviceURL;
[6]301 global $self;
[1]302
303 $hex = "[0-9a-fA-F]";
304 if ( ! preg_match( "/^US3-Experiment/i", $gfacID ) &&
305 ! preg_match( "/^US3-$hex{8}-$hex{4}-$hex{4}-$hex{4}-$hex{12}$/", $gfacID ) )
306 {
307 // Then it's not a GFAC job
308 return false;
309 }
310
311 $url = "$serviceURL/setstatus/$gfacID";
312 try
313 {
314 $post = new HttpRequest( $url, HttpRequest::METH_GET );
315 $http = $post->send();
316 $xml = $post->getResponseBody();
317 }
318 catch ( HttpException $e )
319 {
320 write_log( "$self: Set status unsuccessful - $gfacID" );
321 return false;
322 }
323
324 // Parse the result
325 // Not sure we need to know $gfac_status = parse_response( $xml );
326
327 // return $gfac_status;
328
329 return true;
330}
[6]331
332// Function to update the global database status (AThrift + Finished)
333function update_aira( $gfacID, $message )
334{
335 global $dbhost;
336 global $guser;
337 global $gpasswd;
338 global $gDB;
339 global $self;
340
341 // Get data from global GFAC DB
[35]342 $gLink = mysqli_connect( $dbhost, $guser, $gpasswd, $gDB );
343 if ( ! $gLink )
[6]344 {
[35]345 write_log( "$self: Could not connect to DB $gDB " . mysqli_error( $gLink ) );
[6]346 return;
347 }
348
349 // Update message and update status to 'FINISHED'
350 $query = "UPDATE analysis SET status='FINISHED', " .
[35]351 "queue_msg='" . mysqli_real_escape_string( $gLink, $message ) . "' " .
[6]352 "WHERE gfacID='$gfacID'";
353
[35]354 mysqli_query( $gLink, $query );
[6]355 write_log( "$self: Status FINISHED and 'Finished...' message updated" );
356
357 // Also update the queue_messages table
358 $query = "SELECT id FROM analysis " .
359 "WHERE gfacID = '$gfacID'";
[35]360 $result = mysqli_query( $gLink, $query );
[6]361 if ( ! $result )
362 {
[35]363 write_log( "$self: bad query: $query " . mysqli_error( $gLink ) );
[6]364 return;
365 }
366
[35]367 if ( mysqli_num_rows( $result ) == 0 )
[6]368 {
369// write_log( "$self: can't find $gfacID in GFAC db" );
370 return;
371 }
372
[35]373 list( $aID ) = mysqli_fetch_array( $result );
[6]374
375 $query = "INSERT INTO queue_messages " .
376 "SET analysisID = $aID, " .
[35]377 "message = '" . mysqli_real_escape_string( $gLink, $message ) . "'";
378 $result = mysqli_query( $gLink, $query );
[6]379 if ( ! $result )
380 {
[35]381 write_log( "$self: bad query: $query " . mysqli_error( $gLink ) );
[6]382 return;
383 }
384
[35]385 mysqli_close( $gLink );
[6]386}
[1]387?>
Note: See TracBrowser for help on using the repository browser.