source: trunk/manage-us3-pipe.php @ 3

Last change on this file since 3 was 3, checked in by us3, 9 years ago

Added DONE status, check for other unknown statuses that are added

File size: 7.3 KB
Line 
1<?php
2
3include "/home/us3/bin/listen-config.php";
4
5write_log( "$self: Starting" );
6
7$handle = fopen( $pipe, "r+" );
8
9if ( $handle == NULL ) 
10{
11  write_log( "$self: Cannot open pipe" );
12  exit( -1 );
13}
14
15$msg    = "";
16
17// From a pipe, we don't know when the message terminates, so the sender
18// added a null to indicate the end of each message
19do 
20{
21   $input = fgetc( $handle );   // Read one character at a time
22   $msg  .= $input;
23
24   if ( $input[ 0 ] == chr( 0 ) )
25   {
26      // Go do some work
27      $msg = rtrim( $msg );
28      if ( $msg == "Stop listen" ) break;
29      process( $msg );
30      write_log( "$self: $msg" );
31      $msg = "";
32   }
33} while ( true );
34
35write_log( "$self: Stopping" );
36exit();
37
38// The format of the messages would be
39// db-requestID: message ( colon-space )
40function process( $msg )
41{
42   global $dbhost;
43   global $user;
44   global $passwd;
45   global $self;
46
47   $list                   = explode( ": ", $msg );
48   list( $db, $requestID ) = explode( "-",  array_shift( $list ) );
49   $message                = implode( ": ", $list );
50
51   // Convert to integer
52   settype( $requestID, 'integer' );
53
54   // We need the gfacID
55   $resource = mysql_connect( $dbhost, $user, $passwd );
56
57   if ( ! $resource )
58   {
59      write_log( "$self process(): Could not connect to MySQL - " . mysql_error() );
60      write_log( "$self process(): original msg - $msg" );
61      return;
62   }
63
64   if ( ! mysql_select_db( $db, $resource ) )
65   {
66     write_log( "$self: Could not select DB $db" . mysql_error( $resource ) );
67     write_log( "$self process(): original msg - $msg" );
68     return;
69   }
70
71   $query = "SELECT gfacID FROM HPCAnalysisResult " .
72            "WHERE HPCAnalysisRequestID=$requestID "             .
73            "ORDER BY HPCAnalysisResultID DESC "                 .
74            "LIMIT 1";
75
76   $result = mysql_query( $query, $resource );
77   
78   if ( ! $result )
79   {
80     write_log( "$self process(): Bad query: $query" );
81     write_log( "$self process(): original msg - $msg" );
82     return;
83   }
84
85   list( $gfacID ) = mysql_fetch_row( $result );
86   mysql_close( $resource );
87
88   // Now update the databases
89   if ( preg_match( "/^Starting/i", $message ) )
90   {
91     update_db( $db, $requestID, 'starting', $message );
92     update_gfac( $gfacID, "RUNNING", $message );
93   }
94
95   else if ( preg_match( "/^Abort/i", $message ) )
96   {
97     update_db( $db, $requestID, 'aborted', $message );
98     update_gfac( $gfacID, "CANCELED", $message );
99   }
100
101   else if ( preg_match( "/^Finished/i", $message ) )
102   {
103     update_db( $db, $requestID, 'finished', $message );
104
105     $hex = "[0-9a-fA-F]";
106     if ( preg_match( "/^US3-Experiment/i", $gfacID ) ||
107          preg_match( "/^US3-$hex{8}-$hex{4}-$hex{4}-$hex{4}-$hex{12}$/", $gfacID ) )
108      {
109        // Then it's a GFAC job
110        update_gfac( $gfacID, "UPDATING", $message );     // wait for GFAC to deposit data
111        notify_gfac_done( $gfacID );                      // notify them to go get it
112      }
113
114      else
115      {
116        // It's a local job
117        update_gfac( $gfacID, "COMPLETE", $message );     // data should be there already
118      }
119   }
120
121   else
122   {
123     update_db( $db, $requestID, 'update', $message );
124     update_gfac( $gfacID, "UPDATING", $message );
125   }
126}
127
128function update_db( $db, $requestID, $action, $message )
129{
130   global $dbhost;
131   global $user;
132   global $passwd;
133   global $self;
134
135   $resource = mysql_connect( $dbhost, $user, $passwd );
136
137   if ( ! $resource )
138   {
139      write_log( "$self: Could not connect to DB" );
140      return;
141   }
142
143   if ( ! mysql_select_db( $db, $resource ) )
144   {
145     write_log( "$self: Could not select DB $db" . mysql_error( $resource ) );
146     return;
147   }
148
149   $query = "SELECT HPCAnalysisResultID FROM HPCAnalysisResult " .
150            "WHERE HPCAnalysisRequestID=$requestID "             .
151            "ORDER BY HPCAnalysisResultID DESC "                 .
152            "LIMIT 1";
153
154   $result = mysql_query( $query, $resource );
155   
156   if ( ! $result )
157   {
158     write_log( "$self: Bad query: $query" );
159     return;
160   }
161
162   list( $resultID ) = mysql_fetch_row( $result );
163
164   $query = "UPDATE HPCAnalysisResult SET ";
165
166   switch ( $action )
167   {
168      case "starting":
169         $query .= "queueStatus='running'," .
170                   "startTime=now(), ";
171         break;
172
173      case "aborted":
174         $query .= "queueStatus='aborted'," .
175                   "endTime=now(), ";
176         break;
177
178      case "finished":
179         $query .= "queueStatus='completed'," .
180                   "endTime=now(), ";
181         break;
182
183      default:
184         $query .= "queueStatus='running',";
185         break;
186   }
187
188   $query .= "lastMessage='" . mysql_real_escape_string( $message ) . "'" .
189             "WHERE HPCAnalysisResultID=$resultID";
190
191   mysql_query( $query, $resource );
192   mysql_close( $resource );
193}
194
195// Function to update the global database status
196function update_gfac( $gfacID, $status, $message )
197{
198  global $dbhost;
199  global $guser;
200  global $gpasswd;
201  global $gDB;
202  global $self;
203
204  $allowed_status = array( 'RUNNING',
205                           'UPDATING',
206                           'CANCELED',
207                           'COMPLETE'
208                         );
209
210  // Get data from global GFAC DB
211  $gLink = mysql_connect( $dbhost, $guser, $gpasswd );
212  if ( ! mysql_select_db( $gDB, $gLink ) )
213  {
214    write_log( "$self: Could not select DB $gDB" . mysql_error( $gLink ) );
215    return;
216  }
217
218  $status = strtoupper( $status );
219  if ( ! in_array( $status, $allowed_status ) )
220  {
221    write_log( "$self: update_gfac status $status not allowed" );
222    return;
223  }
224
225  // if 'UPDATING' then we're only updating the queue_messages table
226  if ( $status != 'UPDATING' )
227  {
228     $query = "UPDATE analysis SET status='$status' " .
229              "WHERE gfacID='$gfacID'";
230
231     mysql_query( $query, $gLink );
232  }
233
234  // Also update the queue_messages table
235  $query  = "SELECT id FROM analysis " .
236            "WHERE gfacID = '$gfacID'";
237  $result = mysql_query( $query, $gLink );
238  if ( ! $result )
239  {
240    write_log( "$self: bad query: $query " . mysql_error( $gLink ) );
241    return;
242  }
243
244  if ( mysql_num_rows( $result ) == 0 )
245  {
246    write_log( "$self: can't find $gfacID in GFAC db" );
247    return;
248  }
249
250  list( $aID ) = mysql_fetch_array( $result );
251
252  $query  = "INSERT INTO queue_messages " .
253            "SET analysisID = $aID, " .
254            "message = '" . mysql_real_escape_string( $message ) . "'";
255  $result = mysql_query( $query, $gLink );
256  if ( ! $result )
257  {
258    write_log( "$self: bad query: $query " . mysql_error( $gLink ) );
259    return;
260  }
261
262  mysql_close( $gLink );
263}
264
265// function to notify GFAC that the UDP message "Finished" has arrived
266function notify_gfac_done( $gfacID )
267{
268  global $serviceURL;
269
270  $hex = "[0-9a-fA-F]";
271  if ( ! preg_match( "/^US3-Experiment/i", $gfacID ) &&
272       ! preg_match( "/^US3-$hex{8}-$hex{4}-$hex{4}-$hex{4}-$hex{12}$/", $gfacID ) )
273   {
274      // Then it's not a GFAC job
275      return false;
276   }
277
278   $url = "$serviceURL/setstatus/$gfacID";
279   try
280   {
281      $post = new HttpRequest( $url, HttpRequest::METH_GET );
282      $http = $post->send();
283      $xml  = $post->getResponseBody();     
284   }
285   catch ( HttpException $e )
286   {
287      write_log( "$self: Set status unsuccessful -  $gfacID" );
288      return false;
289   }
290
291   // Parse the result
292   // Not sure we need to know $gfac_status = parse_response( $xml );
293
294   // return $gfac_status;
295
296   return true;
297}
298?>
Note: See TracBrowser for help on using the repository browser.