- initial import of revision 374 from cnc
[apt.git] / apt-pkg / acquire-worker.cc
1 // -*- mode: cpp; mode: fold -*-
2 // Description                                                          /*{{{*/
3 // $Id: acquire-worker.cc,v 1.1 2002/07/23 17:54:50 niemeyer Exp $
4 /* ######################################################################
5
6    Acquire Worker 
7
8    The worker process can startup either as a Configuration prober
9    or as a queue runner. As a configuration prober it only reads the
10    configuration message and 
11    
12    ##################################################################### */
13                                                                         /*}}}*/
14 // Include Files                                                        /*{{{*/
15 #ifdef __GNUG__
16 #pragma implementation "apt-pkg/acquire-worker.h"
17 #endif
18 #include <apt-pkg/acquire-worker.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/configuration.h>
21 #include <apt-pkg/error.h>
22 #include <apt-pkg/fileutl.h>
23 #include <apt-pkg/strutl.h>
24
25 #include <apti18n.h>
26
27 #include <iostream>
28 #include <fstream>
29     
30 #include <sys/stat.h>
31 #include <unistd.h>
32 #include <fcntl.h>
33 #include <signal.h>
34 #include <stdio.h>
35 #include <errno.h>
36                                                                         /*}}}*/
37
38 using namespace std;
39
40 // Worker::Worker - Constructor for Queue startup                       /*{{{*/
41 // ---------------------------------------------------------------------
42 /* */
43 pkgAcquire::Worker::Worker(Queue *Q,MethodConfig *Cnf,
44                            pkgAcquireStatus *Log) : Log(Log)
45 {
46    OwnerQ = Q;
47    Config = Cnf;
48    Access = Cnf->Access;
49    CurrentItem = 0;
50    TotalSize = 0;
51    CurrentSize = 0;
52    
53    Construct();   
54 }
55                                                                         /*}}}*/
56 // Worker::Worker - Constructor for method config startup               /*{{{*/
57 // ---------------------------------------------------------------------
58 /* */
59 pkgAcquire::Worker::Worker(MethodConfig *Cnf)
60 {
61    OwnerQ = 0;
62    Config = Cnf;
63    Access = Cnf->Access;
64    CurrentItem = 0;
65    TotalSize = 0;
66    CurrentSize = 0;
67    
68    Construct();   
69 }
70                                                                         /*}}}*/
71 // Worker::Construct - Constructor helper                               /*{{{*/
72 // ---------------------------------------------------------------------
73 /* */
74 void pkgAcquire::Worker::Construct()
75 {
76    NextQueue = 0;
77    NextAcquire = 0;
78    Process = -1;
79    InFd = -1;
80    OutFd = -1;
81    OutReady = false;
82    InReady = false;
83    Debug = _config->FindB("Debug::pkgAcquire::Worker",false);
84 }
85                                                                         /*}}}*/
86 // Worker::~Worker - Destructor                                         /*{{{*/
87 // ---------------------------------------------------------------------
88 /* */
89 pkgAcquire::Worker::~Worker()
90 {
91    close(InFd);
92    close(OutFd);
93    
94    if (Process > 0)
95    {
96       /* Closing of stdin is the signal to exit and die when the process
97          indicates it needs cleanup */
98       if (Config->NeedsCleanup == false)
99          kill(Process,SIGINT);
100       ExecWait(Process,Access.c_str(),true);
101    }   
102 }
103                                                                         /*}}}*/
104 // Worker::Start - Start the worker process                             /*{{{*/
105 // ---------------------------------------------------------------------
106 /* This forks the method and inits the communication channel */
107 bool pkgAcquire::Worker::Start()
108 {
109    // Get the method path
110    string Method = _config->FindDir("Dir::Bin::Methods") + Access;
111    if (FileExists(Method) == false)
112       return _error->Error(_("The method driver %s could not be found."),Method.c_str());
113
114    if (Debug == true)
115       clog << "Starting method '" << Method << '\'' << endl;
116
117    // Create the pipes
118    int Pipes[4] = {-1,-1,-1,-1};
119    if (pipe(Pipes) != 0 || pipe(Pipes+2) != 0)
120    {
121       _error->Errno("pipe","Failed to create IPC pipe to subprocess");
122       for (int I = 0; I != 4; I++)
123          close(Pipes[I]);
124       return false;
125    }
126    for (int I = 0; I != 4; I++)
127       SetCloseExec(Pipes[I],true);
128    
129    // Fork off the process
130    Process = ExecFork();
131    if (Process == 0)
132    {
133       // Setup the FDs
134       dup2(Pipes[1],STDOUT_FILENO);
135       dup2(Pipes[2],STDIN_FILENO);
136       SetCloseExec(STDOUT_FILENO,false);
137       SetCloseExec(STDIN_FILENO,false);      
138       SetCloseExec(STDERR_FILENO,false);
139       
140       const char *Args[2];
141       Args[0] = Method.c_str();
142       Args[1] = 0;
143       execv(Args[0],(char **)Args);
144       cerr << "Failed to exec method " << Args[0] << endl;
145       _exit(100);
146    }
147
148    // Fix up our FDs
149    InFd = Pipes[0];
150    OutFd = Pipes[3];
151    SetNonBlock(Pipes[0],true);
152    SetNonBlock(Pipes[3],true);
153    close(Pipes[1]);
154    close(Pipes[2]);
155    OutReady = false;
156    InReady = true;
157    
158    // Read the configuration data
159    if (WaitFd(InFd) == false ||
160        ReadMessages() == false)
161       return _error->Error(_("Method %s did not start correctly"),Method.c_str());
162
163    RunMessages();
164    if (OwnerQ != 0)
165       SendConfiguration();
166    
167    // CNC:2004-04-27
168    if (Config->HasPreferredURI == true &&
169        Config->DonePreferredURI == false &&
170        Config->PreferredURI.empty() == true) {
171       SetNonBlock(InFd,false);
172       SetNonBlock(OutFd,false);
173       OutQueue += "679 Preferred URI\n\n";
174       Config->PreferredURI = "<none>";
175       if (OutFdReady() == true)
176          while (InFdReady() == true && Config->PreferredURI == "<none>");
177       SetNonBlock(InFd,true);
178       SetNonBlock(OutFd,true);
179    }
180
181    return true;
182 }
183                                                                         /*}}}*/
184 // Worker::ReadMessages - Read all pending messages into the list       /*{{{*/
185 // ---------------------------------------------------------------------
186 /* */
187 bool pkgAcquire::Worker::ReadMessages()
188 {
189    if (::ReadMessages(InFd,MessageQueue) == false)
190       return MethodFailure();
191    return true;
192 }
193                                                                         /*}}}*/
194 // Worker::RunMessage - Empty the message queue                         /*{{{*/
195 // ---------------------------------------------------------------------
196 /* This takes the messages from the message queue and runs them through
197    the parsers in order. */
198 bool pkgAcquire::Worker::RunMessages()
199 {
200    while (MessageQueue.empty() == false)
201    {
202       string Message = MessageQueue.front();
203       MessageQueue.erase(MessageQueue.begin());
204
205       if (Debug == true)
206          clog << " <- " << Access << ':' << QuoteString(Message,"\n") << endl;
207       
208       // Fetch the message number
209       char *End;
210       int Number = strtol(Message.c_str(),&End,10);
211       if (End == Message.c_str())
212          return _error->Error("Invalid message from method %s: %s",Access.c_str(),Message.c_str());
213
214       string URI = LookupTag(Message,"URI");
215       pkgAcquire::Queue::QItem *Itm = 0;
216       if (URI.empty() == false)
217          Itm = OwnerQ->FindItem(URI,this);
218       
219       // Determine the message number and dispatch
220       switch (Number)
221       {
222          // 100 Capabilities
223          case 100:
224          if (Capabilities(Message) == false)
225             return _error->Error("Unable to process Capabilities message from %s",Access.c_str());
226          break;
227          
228          // 101 Log
229          case 101:
230          if (Debug == true)
231             clog << " <- (log) " << LookupTag(Message,"Message") << endl;
232          break;
233          
234          // 102 Status
235          case 102:
236          Status = LookupTag(Message,"Message");
237          break;
238
239          // CNC:2004-04-27
240          // 179 Preferred URI
241          case 179:
242          Config->PreferredURI = LookupTag(Message, "PreferredURI");
243          break;
244             
245          // 200 URI Start
246          case 200:
247          {
248             if (Itm == 0)
249             {
250                _error->Error("Method gave invalid 200 URI Start message");
251                break;
252             }
253             
254             CurrentItem = Itm;
255             CurrentSize = 0;
256             TotalSize = atoi(LookupTag(Message,"Size","0").c_str());
257             ResumePoint = atoi(LookupTag(Message,"Resume-Point","0").c_str());
258             Itm->Owner->Start(Message,atoi(LookupTag(Message,"Size","0").c_str()));
259
260             // Display update before completion
261             if (Log != 0 && Log->MorePulses == true)
262                Log->Pulse(Itm->Owner->GetOwner());
263             
264             if (Log != 0)
265                Log->Fetch(*Itm);
266
267             break;
268          }
269          
270          // 201 URI Done
271          case 201:
272          {
273             if (Itm == 0)
274             {
275                _error->Error("Method gave invalid 201 URI Done message");
276                break;
277             }
278             
279             pkgAcquire::Item *Owner = Itm->Owner;
280             pkgAcquire::ItemDesc Desc = *Itm;
281             
282             // Display update before completion
283             if (Log != 0 && Log->MorePulses == true)
284                Log->Pulse(Owner->GetOwner());
285             
286             OwnerQ->ItemDone(Itm);
287             if (TotalSize != 0 &&
288                 (unsigned)atoi(LookupTag(Message,"Size","0").c_str()) != TotalSize)
289                _error->Warning("Bizarre Error - File size is not what the server reported %s %lu",
290                                LookupTag(Message,"Size","0").c_str(),TotalSize);
291
292             Owner->Done(Message,atoi(LookupTag(Message,"Size","0").c_str()),
293                         LookupTag(Message,"MD5-Hash"),Config);
294             ItemDone();
295             
296             // Log that we are done
297             if (Log != 0)
298             {
299                if (StringToBool(LookupTag(Message,"IMS-Hit"),false) == true ||
300                    StringToBool(LookupTag(Message,"Alt-IMS-Hit"),false) == true)
301                {
302                   /* Hide 'hits' for local only sources - we also manage to
303                      hide gets */
304                   if (Config->LocalOnly == false)
305                      Log->IMSHit(Desc);
306                }               
307                else
308                   Log->Done(Desc);
309             }
310             break;
311          }       
312          
313          // 400 URI Failure
314          case 400:
315          {
316             if (Itm == 0)
317             {
318                _error->Error("Method gave invalid 400 URI Failure message");
319                break;
320             }
321
322             // Display update before completion
323             if (Log != 0 && Log->MorePulses == true)
324                Log->Pulse(Itm->Owner->GetOwner());
325             
326             pkgAcquire::Item *Owner = Itm->Owner;
327             pkgAcquire::ItemDesc Desc = *Itm;
328             OwnerQ->ItemDone(Itm);
329             Owner->Failed(Message,Config);
330             ItemDone();
331
332             if (Log != 0)
333                Log->Fail(Desc);
334
335             break;
336          }       
337          
338          // 401 General Failure
339          case 401:
340          _error->Error("Method %s General failure: %s",Access.c_str(),LookupTag(Message,"Message").c_str());
341          break;
342          
343          // 403 Media Change
344          case 403:
345          MediaChange(Message); 
346          break;
347       }      
348    }
349    return true;
350 }
351                                                                         /*}}}*/
352 // Worker::Capabilities - 100 Capabilities handler                      /*{{{*/
353 // ---------------------------------------------------------------------
354 /* This parses the capabilities message and dumps it into the configuration
355    structure. */
356 bool pkgAcquire::Worker::Capabilities(string Message)
357 {
358    if (Config == 0)
359       return true;
360    
361    Config->Version = LookupTag(Message,"Version");
362    Config->SingleInstance = StringToBool(LookupTag(Message,"Single-Instance"),false);
363    Config->Pipeline = StringToBool(LookupTag(Message,"Pipeline"),false);
364    Config->SendConfig = StringToBool(LookupTag(Message,"Send-Config"),false);
365    Config->LocalOnly = StringToBool(LookupTag(Message,"Local-Only"),false);
366    Config->NeedsCleanup = StringToBool(LookupTag(Message,"Needs-Cleanup"),false);
367    Config->Removable = StringToBool(LookupTag(Message,"Removable"),false);
368    // CNC:2004-04-27
369    Config->HasPreferredURI = StringToBool(LookupTag(Message,"Has-Preferred-URI"),false);
370
371    // Some debug text
372    if (Debug == true)
373    {
374       clog << "Configured access method " << Config->Access << endl;
375       clog << "Version:" << Config->Version <<
376               " SingleInstance:" << Config->SingleInstance <<
377               " Pipeline:" << Config->Pipeline << 
378               " SendConfig:" << Config->SendConfig << 
379               " LocalOnly: " << Config->LocalOnly << 
380               " NeedsCleanup: " << Config->NeedsCleanup << 
381               // CNC:2004-04-27
382               " Removable: " << Config->Removable <<
383               " HasPreferredURI: " << Config->HasPreferredURI << endl;
384    }
385    
386    return true;
387 }
388                                                                         /*}}}*/
389 // Worker::MediaChange - Request a media change                         /*{{{*/
390 // ---------------------------------------------------------------------
391 /* */
392 bool pkgAcquire::Worker::MediaChange(string Message)
393 {
394    if (Log == 0 || Log->MediaChange(LookupTag(Message,"Media"),
395                                     LookupTag(Message,"Drive")) == false)
396    {
397       char S[300];
398       snprintf(S,sizeof(S),"603 Media Changed\nFailed: true\n\n");
399       if (Debug == true)
400          clog << " -> " << Access << ':' << QuoteString(S,"\n") << endl;
401       OutQueue += S;
402       OutReady = true;
403       return true;
404    }
405
406    char S[300];
407    snprintf(S,sizeof(S),"603 Media Changed\n\n");
408    if (Debug == true)
409       clog << " -> " << Access << ':' << QuoteString(S,"\n") << endl;
410    OutQueue += S;
411    OutReady = true;
412    return true;
413 }
414                                                                         /*}}}*/
415 // Worker::SendConfiguration - Send the config to the method            /*{{{*/
416 // ---------------------------------------------------------------------
417 /* */
418 bool pkgAcquire::Worker::SendConfiguration()
419 {
420    if (Config->SendConfig == false)
421       return true;
422
423    if (OutFd == -1)
424       return false;
425    
426    string Message = "601 Configuration\n";
427    Message.reserve(2000);
428
429    /* Write out all of the configuration directives by walking the 
430       configuration tree */
431    const Configuration::Item *Top = _config->Tree(0);
432    for (; Top != 0;)
433    {
434       if (Top->Value.empty() == false)
435       {
436          string Line = "Config-Item: " + QuoteString(Top->FullTag(),"=\"\n") + "=";
437          Line += QuoteString(Top->Value,"\n") + '\n';
438          Message += Line;
439       }
440       
441       if (Top->Child != 0)
442       {
443          Top = Top->Child;
444          continue;
445       }
446       
447       while (Top != 0 && Top->Next == 0)
448          Top = Top->Parent;
449       if (Top != 0)
450          Top = Top->Next;
451    }   
452    Message += '\n';
453
454    if (Debug == true)
455       clog << " -> " << Access << ':' << QuoteString(Message,"\n") << endl;
456    OutQueue += Message;
457    OutReady = true; 
458    
459    return true;
460 }
461                                                                         /*}}}*/
462 // Worker::QueueItem - Add an item to the outbound queue                /*{{{*/
463 // ---------------------------------------------------------------------
464 /* Send a URI Acquire message to the method */
465 bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem *Item)
466 {
467    if (OutFd == -1)
468       return false;
469    
470    string Message = "600 URI Acquire\n";
471    Message.reserve(300);
472    Message += "URI: " + Item->URI;
473    Message += "\nFilename: " + Item->Owner->DestFile;
474    Message += Item->Owner->Custom600Headers();
475    Message += "\n\n";
476    
477    if (Debug == true)
478       clog << " -> " << Access << ':' << QuoteString(Message,"\n") << endl;
479    OutQueue += Message;
480    OutReady = true;
481    
482    return true;
483 }
484                                                                         /*}}}*/
485 // Worker::OutFdRead - Out bound FD is ready                            /*{{{*/
486 // ---------------------------------------------------------------------
487 /* */
488 bool pkgAcquire::Worker::OutFdReady()
489 {
490    int Res;
491    do
492    {
493       Res = write(OutFd,OutQueue.c_str(),OutQueue.length());
494    }
495    while (Res < 0 && errno == EINTR);
496    
497    if (Res <= 0)
498       return MethodFailure();
499
500    // Hmm.. this should never happen.
501    if (Res < 0)
502       return true;
503    
504    OutQueue.erase(0,Res);
505    if (OutQueue.empty() == true)
506       OutReady = false;
507    
508    return true;
509 }
510                                                                         /*}}}*/
511 // Worker::InFdRead - In bound FD is ready                              /*{{{*/
512 // ---------------------------------------------------------------------
513 /* */
514 bool pkgAcquire::Worker::InFdReady()
515 {
516    if (ReadMessages() == false)
517       return false;
518    RunMessages();
519    return true;
520 }
521                                                                         /*}}}*/
522 // Worker::MethodFailure - Called when the method fails                 /*{{{*/
523 // ---------------------------------------------------------------------
524 /* This is called when the method is belived to have failed, probably because
525    read returned -1. */
526 bool pkgAcquire::Worker::MethodFailure()
527 {
528    _error->Error("Method %s has died unexpectedly!",Access.c_str());
529    
530    ExecWait(Process,Access.c_str(),true);
531    Process = -1;
532    close(InFd);
533    close(OutFd);
534    InFd = -1;
535    OutFd = -1;
536    OutReady = false;
537    InReady = false;
538    OutQueue = string();
539    MessageQueue.erase(MessageQueue.begin(),MessageQueue.end());
540    
541    return false;
542 }
543                                                                         /*}}}*/
544 // Worker::Pulse - Called periodically                                  /*{{{*/
545 // ---------------------------------------------------------------------
546 /* */
547 void pkgAcquire::Worker::Pulse()
548 {
549    if (CurrentItem == 0)
550       return;
551  
552    struct stat Buf;
553    if (stat(CurrentItem->Owner->DestFile.c_str(),&Buf) != 0)
554       return;
555    CurrentSize = Buf.st_size;
556    
557    // Hmm? Should not happen...
558    if (CurrentSize > TotalSize && TotalSize != 0)
559       TotalSize = CurrentSize;
560 }
561                                                                         /*}}}*/
562 // Worker::ItemDone - Called when the current item is finished          /*{{{*/
563 // ---------------------------------------------------------------------
564 /* */
565 void pkgAcquire::Worker::ItemDone()
566 {
567    CurrentItem = 0;
568    CurrentSize = 0;
569    TotalSize = 0;
570    Status = string();
571 }
572                                                                         /*}}}*/
573
574 // vim:sts=3:sw=3