source: trunk/manage-us3-pipe.php@ 3

Last change on this file since 3 was 3, checked in by us3, 13 years ago

Added DONE status, check for other unknown statuses that are added

File size: 7.3 KB
RevLine 
[1]1<?php
2
3include "/home/us3/bin/listen-config.php";
4
5write_log( "$self: Starting" );
6
7$handle = fopen( $pipe, "r+" );
8
9if ( $handle == NULL )
10{
11 write_log( "$self: Cannot open pipe" );
12 exit( -1 );
13}
14
15$msg = "";
16
17// From a pipe, we don't know when the message terminates, so the sender
18// added a null to indicate the end of each message
19do
20{
21 $input = fgetc( $handle ); // Read one character at a time
22 $msg .= $input;
23
24 if ( $input[ 0 ] == chr( 0 ) )
25 {
26 // Go do some work
27 $msg = rtrim( $msg );
28 if ( $msg == "Stop listen" ) break;
29 process( $msg );
30 write_log( "$self: $msg" );
31 $msg = "";
32 }
33} while ( true );
34
35write_log( "$self: Stopping" );
36exit();
37
38// The format of the messages would be
39// db-requestID: message ( colon-space )
40function process( $msg )
41{
42 global $dbhost;
43 global $user;
44 global $passwd;
[3]45 global $self;
[1]46
47 $list = explode( ": ", $msg );
48 list( $db, $requestID ) = explode( "-", array_shift( $list ) );
49 $message = implode( ": ", $list );
50
51 // Convert to integer
52 settype( $requestID, 'integer' );
53
54 // We need the gfacID
55 $resource = mysql_connect( $dbhost, $user, $passwd );
56
57 if ( ! $resource )
58 {
[3]59 write_log( "$self process(): Could not connect to MySQL - " . mysql_error() );
[1]60 write_log( "$self process(): original msg - $msg" );
61 return;
62 }
63
64 if ( ! mysql_select_db( $db, $resource ) )
65 {
66 write_log( "$self: Could not select DB $db" . mysql_error( $resource ) );
67 write_log( "$self process(): original msg - $msg" );
68 return;
69 }
70
71 $query = "SELECT gfacID FROM HPCAnalysisResult " .
72 "WHERE HPCAnalysisRequestID=$requestID " .
73 "ORDER BY HPCAnalysisResultID DESC " .
74 "LIMIT 1";
75
76 $result = mysql_query( $query, $resource );
77
78 if ( ! $result )
79 {
80 write_log( "$self process(): Bad query: $query" );
81 write_log( "$self process(): original msg - $msg" );
82 return;
83 }
84
85 list( $gfacID ) = mysql_fetch_row( $result );
86 mysql_close( $resource );
87
88 // Now update the databases
89 if ( preg_match( "/^Starting/i", $message ) )
90 {
91 update_db( $db, $requestID, 'starting', $message );
92 update_gfac( $gfacID, "RUNNING", $message );
93 }
94
95 else if ( preg_match( "/^Abort/i", $message ) )
96 {
97 update_db( $db, $requestID, 'aborted', $message );
98 update_gfac( $gfacID, "CANCELED", $message );
99 }
100
101 else if ( preg_match( "/^Finished/i", $message ) )
102 {
103 update_db( $db, $requestID, 'finished', $message );
104
105 $hex = "[0-9a-fA-F]";
106 if ( preg_match( "/^US3-Experiment/i", $gfacID ) ||
107 preg_match( "/^US3-$hex{8}-$hex{4}-$hex{4}-$hex{4}-$hex{12}$/", $gfacID ) )
108 {
109 // Then it's a GFAC job
110 update_gfac( $gfacID, "UPDATING", $message ); // wait for GFAC to deposit data
111 notify_gfac_done( $gfacID ); // notify them to go get it
112 }
113
114 else
115 {
116 // It's a local job
117 update_gfac( $gfacID, "COMPLETE", $message ); // data should be there already
118 }
119 }
120
121 else
122 {
123 update_db( $db, $requestID, 'update', $message );
124 update_gfac( $gfacID, "UPDATING", $message );
125 }
126}
127
128function update_db( $db, $requestID, $action, $message )
129{
130 global $dbhost;
131 global $user;
132 global $passwd;
133 global $self;
134
135 $resource = mysql_connect( $dbhost, $user, $passwd );
136
137 if ( ! $resource )
138 {
139 write_log( "$self: Could not connect to DB" );
140 return;
141 }
142
143 if ( ! mysql_select_db( $db, $resource ) )
144 {
145 write_log( "$self: Could not select DB $db" . mysql_error( $resource ) );
146 return;
147 }
148
149 $query = "SELECT HPCAnalysisResultID FROM HPCAnalysisResult " .
150 "WHERE HPCAnalysisRequestID=$requestID " .
151 "ORDER BY HPCAnalysisResultID DESC " .
152 "LIMIT 1";
153
154 $result = mysql_query( $query, $resource );
155
156 if ( ! $result )
157 {
158 write_log( "$self: Bad query: $query" );
159 return;
160 }
161
162 list( $resultID ) = mysql_fetch_row( $result );
163
164 $query = "UPDATE HPCAnalysisResult SET ";
165
166 switch ( $action )
167 {
168 case "starting":
169 $query .= "queueStatus='running'," .
170 "startTime=now(), ";
171 break;
172
173 case "aborted":
174 $query .= "queueStatus='aborted'," .
175 "endTime=now(), ";
176 break;
177
178 case "finished":
179 $query .= "queueStatus='completed'," .
180 "endTime=now(), ";
181 break;
182
183 default:
184 $query .= "queueStatus='running',";
185 break;
186 }
187
188 $query .= "lastMessage='" . mysql_real_escape_string( $message ) . "'" .
189 "WHERE HPCAnalysisResultID=$resultID";
190
191 mysql_query( $query, $resource );
192 mysql_close( $resource );
193}
194
195// Function to update the global database status
196function update_gfac( $gfacID, $status, $message )
197{
198 global $dbhost;
199 global $guser;
200 global $gpasswd;
201 global $gDB;
202 global $self;
203
204 $allowed_status = array( 'RUNNING',
205 'UPDATING',
206 'CANCELED',
207 'COMPLETE'
208 );
209
210 // Get data from global GFAC DB
211 $gLink = mysql_connect( $dbhost, $guser, $gpasswd );
212 if ( ! mysql_select_db( $gDB, $gLink ) )
213 {
214 write_log( "$self: Could not select DB $gDB" . mysql_error( $gLink ) );
215 return;
216 }
217
218 $status = strtoupper( $status );
219 if ( ! in_array( $status, $allowed_status ) )
220 {
221 write_log( "$self: update_gfac status $status not allowed" );
222 return;
223 }
224
225 // if 'UPDATING' then we're only updating the queue_messages table
226 if ( $status != 'UPDATING' )
227 {
228 $query = "UPDATE analysis SET status='$status' " .
229 "WHERE gfacID='$gfacID'";
230
231 mysql_query( $query, $gLink );
232 }
233
234 // Also update the queue_messages table
235 $query = "SELECT id FROM analysis " .
236 "WHERE gfacID = '$gfacID'";
237 $result = mysql_query( $query, $gLink );
238 if ( ! $result )
239 {
240 write_log( "$self: bad query: $query " . mysql_error( $gLink ) );
241 return;
242 }
243
244 if ( mysql_num_rows( $result ) == 0 )
245 {
246 write_log( "$self: can't find $gfacID in GFAC db" );
247 return;
248 }
249
250 list( $aID ) = mysql_fetch_array( $result );
251
252 $query = "INSERT INTO queue_messages " .
253 "SET analysisID = $aID, " .
254 "message = '" . mysql_real_escape_string( $message ) . "'";
255 $result = mysql_query( $query, $gLink );
256 if ( ! $result )
257 {
258 write_log( "$self: bad query: $query " . mysql_error( $gLink ) );
259 return;
260 }
261
262 mysql_close( $gLink );
263}
264
265// function to notify GFAC that the UDP message "Finished" has arrived
266function notify_gfac_done( $gfacID )
267{
268 global $serviceURL;
269
270 $hex = "[0-9a-fA-F]";
271 if ( ! preg_match( "/^US3-Experiment/i", $gfacID ) &&
272 ! preg_match( "/^US3-$hex{8}-$hex{4}-$hex{4}-$hex{4}-$hex{12}$/", $gfacID ) )
273 {
274 // Then it's not a GFAC job
275 return false;
276 }
277
278 $url = "$serviceURL/setstatus/$gfacID";
279 try
280 {
281 $post = new HttpRequest( $url, HttpRequest::METH_GET );
282 $http = $post->send();
283 $xml = $post->getResponseBody();
284 }
285 catch ( HttpException $e )
286 {
287 write_log( "$self: Set status unsuccessful - $gfacID" );
288 return false;
289 }
290
291 // Parse the result
292 // Not sure we need to know $gfac_status = parse_response( $xml );
293
294 // return $gfac_status;
295
296 return true;
297}
298?>
Note: See TracBrowser for help on using the repository browser.