source: trunk/manage-us3-pipe.php @ 1

Last change on this file since 1 was 1, checked in by zollarsd, 10 years ago

Initial import

File size: 7.3 KB
Line 
1<?php
2
3include "/home/us3/bin/listen-config.php";
4
5write_log( "$self: Starting" );
6
7$handle = fopen( $pipe, "r+" );
8
9if ( $handle == NULL ) 
10{
11  write_log( "$self: Cannot open pipe" );
12  exit( -1 );
13}
14
15$msg    = "";
16
17// From a pipe, we don't know when the message terminates, so the sender
18// added a null to indicate the end of each message
19do 
20{
21   $input = fgetc( $handle );   // Read one character at a time
22   $msg  .= $input;
23
24   if ( $input[ 0 ] == chr( 0 ) )
25   {
26      // Go do some work
27      $msg = rtrim( $msg );
28      if ( $msg == "Stop listen" ) break;
29      process( $msg );
30      write_log( "$self: $msg" );
31      $msg = "";
32   }
33} while ( true );
34
35write_log( "$self: Stopping" );
36exit();
37
38// The format of the messages would be
39// db-requestID: message ( colon-space )
40function process( $msg )
41{
42   global $dbhost;
43   global $user;
44   global $passwd;
45
46   $list                   = explode( ": ", $msg );
47   list( $db, $requestID ) = explode( "-",  array_shift( $list ) );
48   $message                = implode( ": ", $list );
49
50   // Convert to integer
51   settype( $requestID, 'integer' );
52
53   // We need the gfacID
54   $resource = mysql_connect( $dbhost, $user, $passwd );
55
56   if ( ! $resource )
57   {
58      write_log( "$self process(): Could not connect to DB" );
59      write_log( "$self process(): original msg - $msg" );
60      return;
61   }
62
63   if ( ! mysql_select_db( $db, $resource ) )
64   {
65     write_log( "$self: Could not select DB $db" . mysql_error( $resource ) );
66     write_log( "$self process(): original msg - $msg" );
67     return;
68   }
69
70   $query = "SELECT gfacID FROM HPCAnalysisResult " .
71            "WHERE HPCAnalysisRequestID=$requestID "             .
72            "ORDER BY HPCAnalysisResultID DESC "                 .
73            "LIMIT 1";
74
75   $result = mysql_query( $query, $resource );
76   
77   if ( ! $result )
78   {
79     write_log( "$self process(): Bad query: $query" );
80     write_log( "$self process(): original msg - $msg" );
81     return;
82   }
83
84   list( $gfacID ) = mysql_fetch_row( $result );
85   mysql_close( $resource );
86
87   // Now update the databases
88   if ( preg_match( "/^Starting/i", $message ) )
89   {
90     update_db( $db, $requestID, 'starting', $message );
91     update_gfac( $gfacID, "RUNNING", $message );
92   }
93
94   else if ( preg_match( "/^Abort/i", $message ) )
95   {
96     update_db( $db, $requestID, 'aborted', $message );
97     update_gfac( $gfacID, "CANCELED", $message );
98   }
99
100   else if ( preg_match( "/^Finished/i", $message ) )
101   {
102     update_db( $db, $requestID, 'finished', $message );
103
104     $hex = "[0-9a-fA-F]";
105     if ( preg_match( "/^US3-Experiment/i", $gfacID ) ||
106          preg_match( "/^US3-$hex{8}-$hex{4}-$hex{4}-$hex{4}-$hex{12}$/", $gfacID ) )
107      {
108        // Then it's a GFAC job
109        update_gfac( $gfacID, "UPDATING", $message );     // wait for GFAC to deposit data
110        notify_gfac_done( $gfacID );                      // notify them to go get it
111      }
112
113      else
114      {
115        // It's a local job
116        update_gfac( $gfacID, "COMPLETE", $message );     // data should be there already
117      }
118   }
119
120   else
121   {
122     update_db( $db, $requestID, 'update', $message );
123     update_gfac( $gfacID, "UPDATING", $message );
124   }
125}
126
127function update_db( $db, $requestID, $action, $message )
128{
129   global $dbhost;
130   global $user;
131   global $passwd;
132   global $self;
133
134   $resource = mysql_connect( $dbhost, $user, $passwd );
135
136   if ( ! $resource )
137   {
138      write_log( "$self: Could not connect to DB" );
139      return;
140   }
141
142   if ( ! mysql_select_db( $db, $resource ) )
143   {
144     write_log( "$self: Could not select DB $db" . mysql_error( $resource ) );
145     return;
146   }
147
148   $query = "SELECT HPCAnalysisResultID FROM HPCAnalysisResult " .
149            "WHERE HPCAnalysisRequestID=$requestID "             .
150            "ORDER BY HPCAnalysisResultID DESC "                 .
151            "LIMIT 1";
152
153   $result = mysql_query( $query, $resource );
154   
155   if ( ! $result )
156   {
157     write_log( "$self: Bad query: $query" );
158     return;
159   }
160
161   list( $resultID ) = mysql_fetch_row( $result );
162
163   $query = "UPDATE HPCAnalysisResult SET ";
164
165   switch ( $action )
166   {
167      case "starting":
168         $query .= "queueStatus='running'," .
169                   "startTime=now(), ";
170         break;
171
172      case "aborted":
173         $query .= "queueStatus='aborted'," .
174                   "endTime=now(), ";
175         break;
176
177      case "finished":
178         $query .= "queueStatus='completed'," .
179                   "endTime=now(), ";
180         break;
181
182      default:
183         $query .= "queueStatus='running',";
184         break;
185   }
186
187   $query .= "lastMessage='" . mysql_real_escape_string( $message ) . "'" .
188             "WHERE HPCAnalysisResultID=$resultID";
189
190   mysql_query( $query, $resource );
191   mysql_close( $resource );
192}
193
194// Function to update the global database status
195function update_gfac( $gfacID, $status, $message )
196{
197  global $dbhost;
198  global $guser;
199  global $gpasswd;
200  global $gDB;
201  global $self;
202
203  $allowed_status = array( 'RUNNING',
204                           'UPDATING',
205                           'CANCELED',
206                           'COMPLETE'
207                         );
208
209  // Get data from global GFAC DB
210  $gLink = mysql_connect( $dbhost, $guser, $gpasswd );
211  if ( ! mysql_select_db( $gDB, $gLink ) )
212  {
213    write_log( "$self: Could not select DB $gDB" . mysql_error( $gLink ) );
214    return;
215  }
216
217  $status = strtoupper( $status );
218  if ( ! in_array( $status, $allowed_status ) )
219  {
220    write_log( "$self: update_gfac status $status not allowed" );
221    return;
222  }
223
224  // if 'UPDATING' then we're only updating the queue_messages table
225  if ( $status != 'UPDATING' )
226  {
227     $query = "UPDATE analysis SET status='$status' " .
228              "WHERE gfacID='$gfacID'";
229
230     mysql_query( $query, $gLink );
231  }
232
233  // Also update the queue_messages table
234  $query  = "SELECT id FROM analysis " .
235            "WHERE gfacID = '$gfacID'";
236  $result = mysql_query( $query, $gLink );
237  if ( ! $result )
238  {
239    write_log( "$self: bad query: $query " . mysql_error( $gLink ) );
240    return;
241  }
242
243  if ( mysql_num_rows( $result ) == 0 )
244  {
245    write_log( "$self: can't find $gfacID in GFAC db" );
246    return;
247  }
248
249  list( $aID ) = mysql_fetch_array( $result );
250
251  $query  = "INSERT INTO queue_messages " .
252            "SET analysisID = $aID, " .
253            "message = '" . mysql_real_escape_string( $message ) . "'";
254  $result = mysql_query( $query, $gLink );
255  if ( ! $result )
256  {
257    write_log( "$self: bad query: $query " . mysql_error( $gLink ) );
258    return;
259  }
260
261  mysql_close( $gLink );
262}
263
264// function to notify GFAC that the UDP message "Finished" has arrived
265function notify_gfac_done( $gfacID )
266{
267  global $serviceURL;
268
269  $hex = "[0-9a-fA-F]";
270  if ( ! preg_match( "/^US3-Experiment/i", $gfacID ) &&
271       ! preg_match( "/^US3-$hex{8}-$hex{4}-$hex{4}-$hex{4}-$hex{12}$/", $gfacID ) )
272   {
273      // Then it's not a GFAC job
274      return false;
275   }
276
277   $url = "$serviceURL/setstatus/$gfacID";
278   try
279   {
280      $post = new HttpRequest( $url, HttpRequest::METH_GET );
281      $http = $post->send();
282      $xml  = $post->getResponseBody();     
283   }
284   catch ( HttpException $e )
285   {
286      write_log( "$self: Set status unsuccessful -  $gfacID" );
287      return false;
288   }
289
290   // Parse the result
291   // Not sure we need to know $gfac_status = parse_response( $xml );
292
293   // return $gfac_status;
294
295   return true;
296}
297?>
Note: See TracBrowser for help on using the repository browser.