source: trunk/manage-us3-pipe.php@ 6

Last change on this file since 6 was 6, checked in by gegorbet, 9 years ago

changes since 2013, mostly for airvata/thrift

File size: 10.6 KB
Line 
1<?php
2
3$us3bin = exec( "ls -d ~us3/bin" );
4include "$us3bin/listen-config.php";
5include "/srv/www/htdocs/common/class/experiment_status.php";
6
7write_log( "$self: Starting" );
8
9$handle = fopen( $pipe, "r+" );
10
11if ( $handle == NULL )
12{
13 write_log( "$self: Cannot open pipe" );
14 exit( -1 );
15}
16
17$msg = "";
18
19// From a pipe, we don't know when the message terminates, so the sender
20// added a null to indicate the end of each message
21do
22{
23 $input = fgetc( $handle ); // Read one character at a time
24 $msg .= $input;
25
26 if ( $input[ 0 ] == chr( 0 ) )
27 {
28 // Go do some work
29 $msg = rtrim( $msg );
30 if ( $msg == "Stop listen" ) break;
31 process( $msg );
32 write_log( "$self: $msg" );
33 $msg = "";
34 }
35} while ( true );
36
37write_log( "$self: Stopping" );
38exit();
39
40// The format of the messages would be
41// db-requestID: message ( colon-space )
42function process( $msg )
43{
44 global $dbhost;
45 global $user;
46 global $passwd;
47 global $self;
48
49 $list = explode( ": ", $msg );
50 list( $db, $requestID ) = explode( "-", array_shift( $list ) );
51 $message = implode( ": ", $list );
52
53 // Convert to integer
54 settype( $requestID, 'integer' );
55
56 // We need the gfacID
57 $resource = mysql_connect( $dbhost, $user, $passwd );
58
59 if ( ! $resource )
60 {
61 write_log( "$self process(): Could not connect to MySQL - " . mysql_error() );
62 write_log( "$self process(): original msg - $msg" );
63 return;
64 }
65
66 if ( ! mysql_select_db( $db, $resource ) )
67 {
68 write_log( "$self: Could not select DB $db" . mysql_error( $resource ) );
69 write_log( "$self process(): original msg - $msg" );
70 return;
71 }
72
73 $query = "SELECT gfacID FROM HPCAnalysisResult " .
74 "WHERE HPCAnalysisRequestID=$requestID " .
75 "ORDER BY HPCAnalysisResultID DESC " .
76 "LIMIT 1";
77
78 $result = mysql_query( $query, $resource );
79
80 if ( ! $result )
81 {
82 write_log( "$self process(): Bad query: $query" );
83 write_log( "$self process(): original msg - $msg" );
84 return;
85 }
86
87 // Set flags for Airavata/Thrift and "Finished..."
88 list( $gfacID ) = mysql_fetch_row( $result );
89 mysql_close( $resource );
90
91 $is_athrift = preg_match( "/^US3-AIRA/i", $gfacID );
92 $is_finished = preg_match( "/^Finished/i", $message );
93
94 if ( $is_athrift )
95 { // Process submitted thru Airavata/Thrift
96 if ( $is_finished )
97 { // Message is "Finished..." : Update message and status
98write_log( "$self process(): Thrift + Finished" );
99 update_db( $db, $requestID, 'finished', $message );
100 update_aira( $gfacID, $message ); // wait for Airvata to deposit data
101 }
102 else
103 { // Other messages : just update message
104//write_log( "$self process(): Thrift, NOT Finished" );
105 $updmsg = 'update';
106 if ( preg_match( "/^Starting/i", $message ) )
107 $updmsg = 'starting';
108 if ( preg_match( "/^Abort/i", $message ) )
109 $updmsg = 'aborted';
110
111 update_db( $db, $requestID, $updmsg, $message );
112 update_gfac( $gfacID, "UPDATING", $message );
113 }
114 }
115
116 else
117 { // Not Airavata/Thrift
118 if ( $is_finished )
119 { // Handle "Finished..." message
120 $hex = "[0-9a-fA-F]";
121
122 if ( preg_match( "/^US3-Experiment/i", $gfacID ) ||
123 preg_match( "/^US3-$hex{8}-$hex{4}-$hex{4}-$hex{4}-$hex{12}$/", $gfacID ) )
124 {
125 // Then it's a GFAC job
126 update_db( $db, $requestID, 'finished', $message );
127 update_gfac( $gfacID, "UPDATING", $message ); // wait for GFAC to deposit data
128 notify_gfac_done( $gfacID ); // notify them to go get it
129 }
130
131 else
132 {
133 // It's a local job
134 update_db( $db, $requestID, 'finished', $message );
135 update_gfac( $gfacID, "COMPLETE", $message ); // data should be there already
136 }
137 }
138
139 else if ( preg_match( "/^Starting/i", $message ) )
140 {
141 update_db( $db, $requestID, 'starting', $message );
142 update_gfac( $gfacID, "RUNNING", $message );
143 }
144
145 else if ( preg_match( "/^Abort/i", $message ) )
146 {
147 update_db( $db, $requestID, 'aborted', $message );
148 update_gfac( $gfacID, "CANCELED", $message );
149 }
150
151 else
152 {
153 update_db( $db, $requestID, 'update', $message );
154 update_gfac( $gfacID, "UPDATING", $message );
155 }
156 }
157}
158
159function update_db( $db, $requestID, $action, $message )
160{
161 global $dbhost;
162 global $user;
163 global $passwd;
164 global $self;
165
166 $resource = mysql_connect( $dbhost, $user, $passwd );
167
168 if ( ! $resource )
169 {
170 write_log( "$self: Could not connect to DB" );
171 return;
172 }
173
174 if ( ! mysql_select_db( $db, $resource ) )
175 {
176 write_log( "$self: Could not select DB $db" . mysql_error( $resource ) );
177 return;
178 }
179
180 $query = "SELECT HPCAnalysisResultID FROM HPCAnalysisResult " .
181 "WHERE HPCAnalysisRequestID=$requestID " .
182 "ORDER BY HPCAnalysisResultID DESC " .
183 "LIMIT 1";
184
185 $result = mysql_query( $query, $resource );
186
187 if ( ! $result )
188 {
189 write_log( "$self: Bad query: $query" );
190 return;
191 }
192
193 list( $resultID ) = mysql_fetch_row( $result );
194
195 $query = "UPDATE HPCAnalysisResult SET ";
196
197 switch ( $action )
198 {
199 case "starting":
200 $query .= "queueStatus='running'," .
201 "startTime=now(), ";
202 break;
203
204 case "aborted":
205 $query .= "queueStatus='aborted'," .
206 "endTime=now(), ";
207 break;
208
209 case "finished":
210 $query .= "queueStatus='completed'," .
211 "endTime=now(), ";
212//write_log( "$self process(): $requestID : dbupd : Finished" );
213 break;
214
215 case "update":
216//write_log( "$self process(): $requestID : dbupd : update" );
217 default:
218 break;
219 }
220
221 $query .= "lastMessage='" . mysql_real_escape_string( $message ) . "'" .
222 "WHERE HPCAnalysisResultID=$resultID";
223
224 mysql_query( $query, $resource );
225 mysql_close( $resource );
226}
227
228// Function to update the global database status
229function update_gfac( $gfacID, $status, $message )
230{
231 global $dbhost;
232 global $guser;
233 global $gpasswd;
234 global $gDB;
235 global $self;
236
237 $allowed_status = array( 'RUNNING',
238 'UPDATING',
239 'CANCELED',
240 'COMPLETE'
241 );
242
243 // Get data from global GFAC DB
244 $gLink = mysql_connect( $dbhost, $guser, $gpasswd );
245 if ( ! mysql_select_db( $gDB, $gLink ) )
246 {
247 write_log( "$self: Could not select DB $gDB" . mysql_error( $gLink ) );
248 return;
249 }
250
251 $status = strtoupper( $status );
252 if ( ! in_array( $status, $allowed_status ) )
253 {
254 write_log( "$self: update_gfac status $status not allowed" );
255 return;
256 }
257
258 // if 'UPDATING' then we're only updating the queue_messages table
259 if ( $status == 'UPDATING' )
260 {
261 $query = "UPDATE analysis " .
262 "SET queue_msg='" . mysql_real_escape_string( $message ) . "' " .
263 "WHERE gfacID='$gfacID'";
264
265//write_log( "$self process(): updgf-u : status=$status" );
266 mysql_query( $query, $gLink );
267 }
268
269 else
270 {
271 $query = "UPDATE analysis SET status='$status', " .
272 "queue_msg='" . mysql_real_escape_string( $message ) . "' " .
273 "WHERE gfacID='$gfacID'";
274
275//write_log( "$self process(): updgf-s : status=$status" );
276 mysql_query( $query, $gLink );
277 }
278
279 // Also update the queue_messages table
280 $query = "SELECT id FROM analysis " .
281 "WHERE gfacID = '$gfacID'";
282 $result = mysql_query( $query, $gLink );
283 if ( ! $result )
284 {
285 write_log( "$self: bad query: $query " . mysql_error( $gLink ) );
286 return;
287 }
288
289 if ( mysql_num_rows( $result ) == 0 )
290 {
291 write_log( "$self: can't find $gfacID in GFAC db" );
292 return;
293 }
294
295 list( $aID ) = mysql_fetch_array( $result );
296
297 $query = "INSERT INTO queue_messages " .
298 "SET analysisID = $aID, " .
299 "message = '" . mysql_real_escape_string( $message ) . "'";
300 $result = mysql_query( $query, $gLink );
301 if ( ! $result )
302 {
303 write_log( "$self: bad query: $query " . mysql_error( $gLink ) );
304 return;
305 }
306
307 mysql_close( $gLink );
308}
309
310// function to notify GFAC that the UDP message "Finished" has arrived
311function notify_gfac_done( $gfacID )
312{
313 global $serviceURL;
314 global $self;
315
316 $hex = "[0-9a-fA-F]";
317 if ( ! preg_match( "/^US3-Experiment/i", $gfacID ) &&
318 ! preg_match( "/^US3-$hex{8}-$hex{4}-$hex{4}-$hex{4}-$hex{12}$/", $gfacID ) )
319 {
320 // Then it's not a GFAC job
321 return false;
322 }
323
324 $url = "$serviceURL/setstatus/$gfacID";
325 try
326 {
327 $post = new HttpRequest( $url, HttpRequest::METH_GET );
328 $http = $post->send();
329 $xml = $post->getResponseBody();
330 }
331 catch ( HttpException $e )
332 {
333 write_log( "$self: Set status unsuccessful - $gfacID" );
334 return false;
335 }
336
337 // Parse the result
338 // Not sure we need to know $gfac_status = parse_response( $xml );
339
340 // return $gfac_status;
341
342 return true;
343}
344
345// Function to update the global database status (AThrift + Finished)
346function update_aira( $gfacID, $message )
347{
348 global $dbhost;
349 global $guser;
350 global $gpasswd;
351 global $gDB;
352 global $self;
353
354 // Get data from global GFAC DB
355 $gLink = mysql_connect( $dbhost, $guser, $gpasswd );
356 if ( ! mysql_select_db( $gDB, $gLink ) )
357 {
358 write_log( "$self: Could not select DB $gDB" . mysql_error( $gLink ) );
359 return;
360 }
361
362 // Update message and update status to 'FINISHED'
363 $query = "UPDATE analysis SET status='FINISHED', " .
364 "queue_msg='" . mysql_real_escape_string( $message ) . "' " .
365 "WHERE gfacID='$gfacID'";
366
367 mysql_query( $query, $gLink );
368 write_log( "$self: Status FINISHED and 'Finished...' message updated" );
369
370 // Also update the queue_messages table
371 $query = "SELECT id FROM analysis " .
372 "WHERE gfacID = '$gfacID'";
373 $result = mysql_query( $query, $gLink );
374 if ( ! $result )
375 {
376 write_log( "$self: bad query: $query " . mysql_error( $gLink ) );
377 return;
378 }
379
380 if ( mysql_num_rows( $result ) == 0 )
381 {
382// write_log( "$self: can't find $gfacID in GFAC db" );
383 return;
384 }
385
386 list( $aID ) = mysql_fetch_array( $result );
387
388 $query = "INSERT INTO queue_messages " .
389 "SET analysisID = $aID, " .
390 "message = '" . mysql_real_escape_string( $message ) . "'";
391 $result = mysql_query( $query, $gLink );
392 if ( ! $result )
393 {
394 write_log( "$self: bad query: $query " . mysql_error( $gLink ) );
395 return;
396 }
397
398 mysql_close( $gLink );
399}
400?>
Note: See TracBrowser for help on using the repository browser.