source: trunk/manage-us3-pipe.php@ 41

Last change on this file since 41 was 35, checked in by gegorbet, 7 years ago

mods mostly for use of mysqli

File size: 10.3 KB
Line 
1<?php
2
3$us3bin = exec( "ls -d ~us3/lims/bin" );
4include "$us3bin/listen-config.php";
5include "$class_dir/experiment_status.php";
6
7write_log( "$self: Starting" );
8
9$handle = fopen( $pipe, "r+" );
10
11if ( $handle == NULL )
12{
13 write_log( "$self: Cannot open pipe" );
14 exit( -1 );
15}
16
17$msg = "";
18
19// From a pipe, we don't know when the message terminates, so the sender
20// added a null to indicate the end of each message
21do
22{
23 $input = fgetc( $handle ); // Read one character at a time
24 $msg .= $input;
25
26 if ( $input[ 0 ] == chr( 0 ) )
27 {
28 // Go do some work
29 $msg = rtrim( $msg );
30 if ( $msg == "Stop listen" ) break;
31 process( $msg );
32 write_log( "$self: $msg" );
33 $msg = "";
34 }
35} while ( true );
36
37write_log( "$self: Stopping" );
38exit();
39
40// The format of the messages would be
41// db-requestID: message ( colon-space )
42function process( $msg )
43{
44 global $dbhost;
45 global $user;
46 global $passwd;
47 global $self;
48
49 $list = explode( ": ", $msg );
50 list( $db, $requestID ) = explode( "-", array_shift( $list ) );
51 $message = implode( ": ", $list );
52
53 // Convert to integer
54 settype( $requestID, 'integer' );
55
56 // We need the gfacID
57 $resource = mysqli_connect( $dbhost, $user, $passwd, $db );
58
59 if ( ! $resource )
60 {
61 write_log( "$self process(): Could not connect to MySQL - " . mysqli_error($resource) );
62 write_log( "$self process(): original msg - $msg" );
63 return;
64 }
65
66 $query = "SELECT gfacID FROM HPCAnalysisResult " .
67 "WHERE HPCAnalysisRequestID=$requestID " .
68 "ORDER BY HPCAnalysisResultID DESC " .
69 "LIMIT 1";
70
71 $result = mysqli_query( $resource, $query );
72
73 if ( ! $result )
74 {
75 write_log( "$self process(): Bad query: $query" );
76 write_log( "$self process(): original msg - $msg" );
77 return;
78 }
79
80 // Set flags for Airavata/Thrift and "Finished..."
81 list( $gfacID ) = mysqli_fetch_row( $result );
82 mysqli_close( $resource );
83
84 $is_athrift = preg_match( "/^US3-A/i", $gfacID );
85 $is_finished = preg_match( "/^Finished/i", $message );
86
87 if ( $is_athrift )
88 { // Process submitted thru Airavata/Thrift
89 if ( $is_finished )
90 { // Message is "Finished..." : Update message and status
91write_log( "$self process(): Thrift + Finished" );
92 update_db( $db, $requestID, 'finished', $message );
93 update_aira( $gfacID, $message ); // wait for Airvata to deposit data
94 }
95 else
96 { // Other messages : just update message
97//write_log( "$self process(): Thrift, NOT Finished" );
98 $updmsg = 'update';
99 if ( preg_match( "/^Starting/i", $message ) )
100 $updmsg = 'starting';
101 if ( preg_match( "/^Abort/i", $message ) )
102 $updmsg = 'aborted';
103
104 update_db( $db, $requestID, $updmsg, $message );
105 update_gfac( $gfacID, "UPDATING", $message );
106 }
107 }
108
109 else
110 { // Not Airavata/Thrift
111 if ( $is_finished )
112 { // Handle "Finished..." message
113 $hex = "[0-9a-fA-F]";
114
115 if ( preg_match( "/^US3-Experiment/i", $gfacID ) ||
116 preg_match( "/^US3-$hex{8}-$hex{4}-$hex{4}-$hex{4}-$hex{12}$/", $gfacID ) )
117 {
118 // Then it's a GFAC job
119 update_db( $db, $requestID, 'finished', $message );
120 update_gfac( $gfacID, "UPDATING", $message ); // wait for GFAC to deposit data
121 notify_gfac_done( $gfacID ); // notify them to go get it
122 }
123
124 else
125 {
126 // It's a local job
127 update_db( $db, $requestID, 'finished', $message );
128 update_gfac( $gfacID, "COMPLETE", $message ); // data should be there already
129 }
130 }
131
132 else if ( preg_match( "/^Starting/i", $message ) )
133 {
134 update_db( $db, $requestID, 'starting', $message );
135 update_gfac( $gfacID, "RUNNING", $message );
136 }
137
138 else if ( preg_match( "/^Abort/i", $message ) )
139 {
140 update_db( $db, $requestID, 'aborted', $message );
141 update_gfac( $gfacID, "CANCELED", $message );
142 }
143
144 else
145 {
146 update_db( $db, $requestID, 'update', $message );
147 update_gfac( $gfacID, "UPDATING", $message );
148 }
149 }
150}
151
152function update_db( $db, $requestID, $action, $message )
153{
154 global $dbhost;
155 global $user;
156 global $passwd;
157 global $self;
158
159 $resource = mysqli_connect( $dbhost, $user, $passwd, $db );
160
161 if ( ! $resource )
162 {
163 write_log( "$self: Could not connect to DB $db " . mysqli_error( $resource ) );
164 return;
165 }
166
167 $query = "SELECT HPCAnalysisResultID FROM HPCAnalysisResult " .
168 "WHERE HPCAnalysisRequestID=$requestID " .
169 "ORDER BY HPCAnalysisResultID DESC " .
170 "LIMIT 1";
171
172 $result = mysqli_query( $resource, $query );
173
174 if ( ! $result )
175 {
176 write_log( "$self: Bad query: $query" );
177 return;
178 }
179
180 list( $resultID ) = mysqli_fetch_row( $result );
181
182 $query = "UPDATE HPCAnalysisResult SET ";
183
184 switch ( $action )
185 {
186 case "starting":
187 $query .= "queueStatus='running'," .
188 "startTime=now(), ";
189 break;
190
191 case "aborted":
192 $query .= "queueStatus='aborted'," .
193 "endTime=now(), ";
194 break;
195
196 case "finished":
197 $query .= "queueStatus='completed'," .
198 "endTime=now(), ";
199//write_log( "$self process(): $requestID : dbupd : Finished" );
200 break;
201
202 case "update":
203//write_log( "$self process(): $requestID : dbupd : update" );
204 default:
205 break;
206 }
207
208 $query .= "lastMessage='" . mysqli_real_escape_string( $resource, $message ) . "'" .
209 "WHERE HPCAnalysisResultID=$resultID";
210
211 mysqli_query( $resource, $query );
212 mysqli_close( $resource );
213}
214
215// Function to update the global database status
216function update_gfac( $gfacID, $status, $message )
217{
218 global $dbhost;
219 global $guser;
220 global $gpasswd;
221 global $gDB;
222 global $self;
223
224 $allowed_status = array( 'RUNNING',
225 'UPDATING',
226 'CANCELED',
227 'COMPLETE'
228 );
229
230 // Get data from global GFAC DB
231 $gLink = mysqli_connect( $dbhost, $guser, $gpasswd, $gDB );
232 if ( ! $gLink )
233 {
234 write_log( "$self: Could not select DB $gDB " . mysqli_error( $gLink ) );
235 return;
236 }
237
238 $status = strtoupper( $status );
239 if ( ! in_array( $status, $allowed_status ) )
240 {
241 write_log( "$self: update_gfac status $status not allowed" );
242 return;
243 }
244
245 // if 'UPDATING' then we're only updating the queue_messages table
246 if ( $status == 'UPDATING' )
247 {
248 $query = "UPDATE analysis " .
249 "SET queue_msg='" . mysqli_real_escape_string( $gLink, $message ) . "' " .
250 "WHERE gfacID='$gfacID'";
251
252//write_log( "$self process(): updgf-u : status=$status" );
253 mysqli_query( $gLink, $query );
254 }
255
256 else
257 {
258 $query = "UPDATE analysis SET status='$status', " .
259 "queue_msg='" . mysqli_real_escape_string( $gLink, $message ) . "' " .
260 "WHERE gfacID='$gfacID'";
261
262//write_log( "$self process(): updgf-s : status=$status" );
263 mysqli_query( $gLink, $query );
264 }
265
266 // Also update the queue_messages table
267 $query = "SELECT id FROM analysis " .
268 "WHERE gfacID = '$gfacID'";
269 $result = mysqli_query( $gLink, $query );
270 if ( ! $result )
271 {
272 write_log( "$self: bad query: $query " . mysqli_error( $gLink ) );
273 return;
274 }
275
276 if ( mysqli_num_rows( $result ) == 0 )
277 {
278 write_log( "$self: can't find $gfacID in GFAC db" );
279 return;
280 }
281
282 list( $aID ) = mysqli_fetch_array( $result );
283
284 $query = "INSERT INTO queue_messages " .
285 "SET analysisID = $aID, " .
286 "message = '" . mysqli_real_escape_string( $gLink, $message ) . "'";
287 $result = mysqli_query( $gLink, $query );
288 if ( ! $result )
289 {
290 write_log( "$self: bad query: $query " . mysqli_error( $gLink ) );
291 return;
292 }
293
294 mysqli_close( $gLink );
295}
296
297// function to notify GFAC that the UDP message "Finished" has arrived
298function notify_gfac_done( $gfacID )
299{
300 global $serviceURL;
301 global $self;
302
303 $hex = "[0-9a-fA-F]";
304 if ( ! preg_match( "/^US3-Experiment/i", $gfacID ) &&
305 ! preg_match( "/^US3-$hex{8}-$hex{4}-$hex{4}-$hex{4}-$hex{12}$/", $gfacID ) )
306 {
307 // Then it's not a GFAC job
308 return false;
309 }
310
311 $url = "$serviceURL/setstatus/$gfacID";
312 try
313 {
314 $post = new HttpRequest( $url, HttpRequest::METH_GET );
315 $http = $post->send();
316 $xml = $post->getResponseBody();
317 }
318 catch ( HttpException $e )
319 {
320 write_log( "$self: Set status unsuccessful - $gfacID" );
321 return false;
322 }
323
324 // Parse the result
325 // Not sure we need to know $gfac_status = parse_response( $xml );
326
327 // return $gfac_status;
328
329 return true;
330}
331
332// Function to update the global database status (AThrift + Finished)
333function update_aira( $gfacID, $message )
334{
335 global $dbhost;
336 global $guser;
337 global $gpasswd;
338 global $gDB;
339 global $self;
340
341 // Get data from global GFAC DB
342 $gLink = mysqli_connect( $dbhost, $guser, $gpasswd, $gDB );
343 if ( ! $gLink )
344 {
345 write_log( "$self: Could not connect to DB $gDB " . mysqli_error( $gLink ) );
346 return;
347 }
348
349 // Update message and update status to 'FINISHED'
350 $query = "UPDATE analysis SET status='FINISHED', " .
351 "queue_msg='" . mysqli_real_escape_string( $gLink, $message ) . "' " .
352 "WHERE gfacID='$gfacID'";
353
354 mysqli_query( $gLink, $query );
355 write_log( "$self: Status FINISHED and 'Finished...' message updated" );
356
357 // Also update the queue_messages table
358 $query = "SELECT id FROM analysis " .
359 "WHERE gfacID = '$gfacID'";
360 $result = mysqli_query( $gLink, $query );
361 if ( ! $result )
362 {
363 write_log( "$self: bad query: $query " . mysqli_error( $gLink ) );
364 return;
365 }
366
367 if ( mysqli_num_rows( $result ) == 0 )
368 {
369// write_log( "$self: can't find $gfacID in GFAC db" );
370 return;
371 }
372
373 list( $aID ) = mysqli_fetch_array( $result );
374
375 $query = "INSERT INTO queue_messages " .
376 "SET analysisID = $aID, " .
377 "message = '" . mysqli_real_escape_string( $gLink, $message ) . "'";
378 $result = mysqli_query( $gLink, $query );
379 if ( ! $result )
380 {
381 write_log( "$self: bad query: $query " . mysqli_error( $gLink ) );
382 return;
383 }
384
385 mysqli_close( $gLink );
386}
387?>
Note: See TracBrowser for help on using the repository browser.