source: trunk/manage-us3-pipe.php@ 1

Last change on this file since 1 was 1, checked in by zollarsd, 13 years ago

Initial import

File size: 7.3 KB
RevLine 
[1]1<?php
2
3include "/home/us3/bin/listen-config.php";
4
5write_log( "$self: Starting" );
6
7$handle = fopen( $pipe, "r+" );
8
9if ( $handle == NULL )
10{
11 write_log( "$self: Cannot open pipe" );
12 exit( -1 );
13}
14
15$msg = "";
16
17// From a pipe, we don't know when the message terminates, so the sender
18// added a null to indicate the end of each message
19do
20{
21 $input = fgetc( $handle ); // Read one character at a time
22 $msg .= $input;
23
24 if ( $input[ 0 ] == chr( 0 ) )
25 {
26 // Go do some work
27 $msg = rtrim( $msg );
28 if ( $msg == "Stop listen" ) break;
29 process( $msg );
30 write_log( "$self: $msg" );
31 $msg = "";
32 }
33} while ( true );
34
35write_log( "$self: Stopping" );
36exit();
37
38// The format of the messages would be
39// db-requestID: message ( colon-space )
40function process( $msg )
41{
42 global $dbhost;
43 global $user;
44 global $passwd;
45
46 $list = explode( ": ", $msg );
47 list( $db, $requestID ) = explode( "-", array_shift( $list ) );
48 $message = implode( ": ", $list );
49
50 // Convert to integer
51 settype( $requestID, 'integer' );
52
53 // We need the gfacID
54 $resource = mysql_connect( $dbhost, $user, $passwd );
55
56 if ( ! $resource )
57 {
58 write_log( "$self process(): Could not connect to DB" );
59 write_log( "$self process(): original msg - $msg" );
60 return;
61 }
62
63 if ( ! mysql_select_db( $db, $resource ) )
64 {
65 write_log( "$self: Could not select DB $db" . mysql_error( $resource ) );
66 write_log( "$self process(): original msg - $msg" );
67 return;
68 }
69
70 $query = "SELECT gfacID FROM HPCAnalysisResult " .
71 "WHERE HPCAnalysisRequestID=$requestID " .
72 "ORDER BY HPCAnalysisResultID DESC " .
73 "LIMIT 1";
74
75 $result = mysql_query( $query, $resource );
76
77 if ( ! $result )
78 {
79 write_log( "$self process(): Bad query: $query" );
80 write_log( "$self process(): original msg - $msg" );
81 return;
82 }
83
84 list( $gfacID ) = mysql_fetch_row( $result );
85 mysql_close( $resource );
86
87 // Now update the databases
88 if ( preg_match( "/^Starting/i", $message ) )
89 {
90 update_db( $db, $requestID, 'starting', $message );
91 update_gfac( $gfacID, "RUNNING", $message );
92 }
93
94 else if ( preg_match( "/^Abort/i", $message ) )
95 {
96 update_db( $db, $requestID, 'aborted', $message );
97 update_gfac( $gfacID, "CANCELED", $message );
98 }
99
100 else if ( preg_match( "/^Finished/i", $message ) )
101 {
102 update_db( $db, $requestID, 'finished', $message );
103
104 $hex = "[0-9a-fA-F]";
105 if ( preg_match( "/^US3-Experiment/i", $gfacID ) ||
106 preg_match( "/^US3-$hex{8}-$hex{4}-$hex{4}-$hex{4}-$hex{12}$/", $gfacID ) )
107 {
108 // Then it's a GFAC job
109 update_gfac( $gfacID, "UPDATING", $message ); // wait for GFAC to deposit data
110 notify_gfac_done( $gfacID ); // notify them to go get it
111 }
112
113 else
114 {
115 // It's a local job
116 update_gfac( $gfacID, "COMPLETE", $message ); // data should be there already
117 }
118 }
119
120 else
121 {
122 update_db( $db, $requestID, 'update', $message );
123 update_gfac( $gfacID, "UPDATING", $message );
124 }
125}
126
127function update_db( $db, $requestID, $action, $message )
128{
129 global $dbhost;
130 global $user;
131 global $passwd;
132 global $self;
133
134 $resource = mysql_connect( $dbhost, $user, $passwd );
135
136 if ( ! $resource )
137 {
138 write_log( "$self: Could not connect to DB" );
139 return;
140 }
141
142 if ( ! mysql_select_db( $db, $resource ) )
143 {
144 write_log( "$self: Could not select DB $db" . mysql_error( $resource ) );
145 return;
146 }
147
148 $query = "SELECT HPCAnalysisResultID FROM HPCAnalysisResult " .
149 "WHERE HPCAnalysisRequestID=$requestID " .
150 "ORDER BY HPCAnalysisResultID DESC " .
151 "LIMIT 1";
152
153 $result = mysql_query( $query, $resource );
154
155 if ( ! $result )
156 {
157 write_log( "$self: Bad query: $query" );
158 return;
159 }
160
161 list( $resultID ) = mysql_fetch_row( $result );
162
163 $query = "UPDATE HPCAnalysisResult SET ";
164
165 switch ( $action )
166 {
167 case "starting":
168 $query .= "queueStatus='running'," .
169 "startTime=now(), ";
170 break;
171
172 case "aborted":
173 $query .= "queueStatus='aborted'," .
174 "endTime=now(), ";
175 break;
176
177 case "finished":
178 $query .= "queueStatus='completed'," .
179 "endTime=now(), ";
180 break;
181
182 default:
183 $query .= "queueStatus='running',";
184 break;
185 }
186
187 $query .= "lastMessage='" . mysql_real_escape_string( $message ) . "'" .
188 "WHERE HPCAnalysisResultID=$resultID";
189
190 mysql_query( $query, $resource );
191 mysql_close( $resource );
192}
193
194// Function to update the global database status
195function update_gfac( $gfacID, $status, $message )
196{
197 global $dbhost;
198 global $guser;
199 global $gpasswd;
200 global $gDB;
201 global $self;
202
203 $allowed_status = array( 'RUNNING',
204 'UPDATING',
205 'CANCELED',
206 'COMPLETE'
207 );
208
209 // Get data from global GFAC DB
210 $gLink = mysql_connect( $dbhost, $guser, $gpasswd );
211 if ( ! mysql_select_db( $gDB, $gLink ) )
212 {
213 write_log( "$self: Could not select DB $gDB" . mysql_error( $gLink ) );
214 return;
215 }
216
217 $status = strtoupper( $status );
218 if ( ! in_array( $status, $allowed_status ) )
219 {
220 write_log( "$self: update_gfac status $status not allowed" );
221 return;
222 }
223
224 // if 'UPDATING' then we're only updating the queue_messages table
225 if ( $status != 'UPDATING' )
226 {
227 $query = "UPDATE analysis SET status='$status' " .
228 "WHERE gfacID='$gfacID'";
229
230 mysql_query( $query, $gLink );
231 }
232
233 // Also update the queue_messages table
234 $query = "SELECT id FROM analysis " .
235 "WHERE gfacID = '$gfacID'";
236 $result = mysql_query( $query, $gLink );
237 if ( ! $result )
238 {
239 write_log( "$self: bad query: $query " . mysql_error( $gLink ) );
240 return;
241 }
242
243 if ( mysql_num_rows( $result ) == 0 )
244 {
245 write_log( "$self: can't find $gfacID in GFAC db" );
246 return;
247 }
248
249 list( $aID ) = mysql_fetch_array( $result );
250
251 $query = "INSERT INTO queue_messages " .
252 "SET analysisID = $aID, " .
253 "message = '" . mysql_real_escape_string( $message ) . "'";
254 $result = mysql_query( $query, $gLink );
255 if ( ! $result )
256 {
257 write_log( "$self: bad query: $query " . mysql_error( $gLink ) );
258 return;
259 }
260
261 mysql_close( $gLink );
262}
263
264// function to notify GFAC that the UDP message "Finished" has arrived
265function notify_gfac_done( $gfacID )
266{
267 global $serviceURL;
268
269 $hex = "[0-9a-fA-F]";
270 if ( ! preg_match( "/^US3-Experiment/i", $gfacID ) &&
271 ! preg_match( "/^US3-$hex{8}-$hex{4}-$hex{4}-$hex{4}-$hex{12}$/", $gfacID ) )
272 {
273 // Then it's not a GFAC job
274 return false;
275 }
276
277 $url = "$serviceURL/setstatus/$gfacID";
278 try
279 {
280 $post = new HttpRequest( $url, HttpRequest::METH_GET );
281 $http = $post->send();
282 $xml = $post->getResponseBody();
283 }
284 catch ( HttpException $e )
285 {
286 write_log( "$self: Set status unsuccessful - $gfacID" );
287 return false;
288 }
289
290 // Parse the result
291 // Not sure we need to know $gfac_status = parse_response( $xml );
292
293 // return $gfac_status;
294
295 return true;
296}
297?>
Note: See TracBrowser for help on using the repository browser.