- merge repomd branch
[apt.git] / apt-pkg / acquire-worker.cc
1 // -*- mode: cpp; mode: fold -*-
2 // Description                                                          /*{{{*/
3 // $Id: acquire-worker.cc,v 1.1 2002/07/23 17:54:50 niemeyer Exp $
4 /* ######################################################################
5
6    Acquire Worker 
7
8    The worker process can startup either as a Configuration prober
9    or as a queue runner. As a configuration prober it only reads the
10    configuration message and 
11    
12    ##################################################################### */
13                                                                         /*}}}*/
14 // Include Files                                                        /*{{{*/
15 #ifdef __GNUG__
16 #pragma implementation "apt-pkg/acquire-worker.h"
17 #endif
18 #include <apt-pkg/acquire-worker.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/configuration.h>
21 #include <apt-pkg/error.h>
22 #include <apt-pkg/fileutl.h>
23 #include <apt-pkg/strutl.h>
24
25 #include <apti18n.h>
26
27 #include <iostream>
28 #include <fstream>
29     
30 #include <sys/stat.h>
31 #include <unistd.h>
32 #include <fcntl.h>
33 #include <signal.h>
34 #include <stdio.h>
35 #include <errno.h>
36                                                                         /*}}}*/
37
38 using namespace std;
39
40 // Worker::Worker - Constructor for Queue startup                       /*{{{*/
41 // ---------------------------------------------------------------------
42 /* */
43 pkgAcquire::Worker::Worker(Queue *Q,MethodConfig *Cnf,
44                            pkgAcquireStatus *Log) : Log(Log)
45 {
46    OwnerQ = Q;
47    Config = Cnf;
48    Access = Cnf->Access;
49    CurrentItem = 0;
50    TotalSize = 0;
51    CurrentSize = 0;
52    
53    Construct();   
54 }
55                                                                         /*}}}*/
56 // Worker::Worker - Constructor for method config startup               /*{{{*/
57 // ---------------------------------------------------------------------
58 /* */
59 pkgAcquire::Worker::Worker(MethodConfig *Cnf)
60 {
61    OwnerQ = 0;
62    Config = Cnf;
63    Access = Cnf->Access;
64    CurrentItem = 0;
65    TotalSize = 0;
66    CurrentSize = 0;
67    
68    Construct();   
69 }
70                                                                         /*}}}*/
71 // Worker::Construct - Constructor helper                               /*{{{*/
72 // ---------------------------------------------------------------------
73 /* */
74 void pkgAcquire::Worker::Construct()
75 {
76    NextQueue = 0;
77    NextAcquire = 0;
78    Process = -1;
79    InFd = -1;
80    OutFd = -1;
81    OutReady = false;
82    InReady = false;
83    Debug = _config->FindB("Debug::pkgAcquire::Worker",false);
84 }
85                                                                         /*}}}*/
86 // Worker::~Worker - Destructor                                         /*{{{*/
87 // ---------------------------------------------------------------------
88 /* */
89 pkgAcquire::Worker::~Worker()
90 {
91    close(InFd);
92    close(OutFd);
93    
94    if (Process > 0)
95    {
96       /* Closing of stdin is the signal to exit and die when the process
97          indicates it needs cleanup */
98       if (Config->NeedsCleanup == false)
99          kill(Process,SIGINT);
100       ExecWait(Process,Access.c_str(),true);
101    }   
102 }
103                                                                         /*}}}*/
104 // Worker::Start - Start the worker process                             /*{{{*/
105 // ---------------------------------------------------------------------
106 /* This forks the method and inits the communication channel */
107 bool pkgAcquire::Worker::Start()
108 {
109    // Get the method path
110    string Method = _config->FindDir("Dir::Bin::Methods") + Access;
111    if (FileExists(Method) == false)
112       return _error->Error(_("The method driver %s could not be found."),Method.c_str());
113
114    if (Debug == true)
115       clog << "Starting method '" << Method << '\'' << endl;
116
117    // Create the pipes
118    int Pipes[4] = {-1,-1,-1,-1};
119    if (pipe(Pipes) != 0 || pipe(Pipes+2) != 0)
120    {
121       _error->Errno("pipe","Failed to create IPC pipe to subprocess");
122       for (int I = 0; I != 4; I++)
123          close(Pipes[I]);
124       return false;
125    }
126    for (int I = 0; I != 4; I++)
127       SetCloseExec(Pipes[I],true);
128    
129    // Fork off the process
130    Process = ExecFork();
131    if (Process == 0)
132    {
133       // Setup the FDs
134       dup2(Pipes[1],STDOUT_FILENO);
135       dup2(Pipes[2],STDIN_FILENO);
136       SetCloseExec(STDOUT_FILENO,false);
137       SetCloseExec(STDIN_FILENO,false);      
138       SetCloseExec(STDERR_FILENO,false);
139       
140       const char *Args[2];
141       Args[0] = Method.c_str();
142       Args[1] = 0;
143       execv(Args[0],(char **)Args);
144       cerr << "Failed to exec method " << Args[0] << endl;
145       _exit(100);
146    }
147
148    // Fix up our FDs
149    InFd = Pipes[0];
150    OutFd = Pipes[3];
151    SetNonBlock(Pipes[0],true);
152    SetNonBlock(Pipes[3],true);
153    close(Pipes[1]);
154    close(Pipes[2]);
155    OutReady = false;
156    InReady = true;
157    
158    // Read the configuration data
159    if (WaitFd(InFd) == false ||
160        ReadMessages() == false)
161       return _error->Error(_("Method %s did not start correctly"),Method.c_str());
162
163    RunMessages();
164    if (OwnerQ != 0)
165       SendConfiguration();
166    
167    // CNC:2004-04-27
168    if (Config->HasPreferredURI == true &&
169        Config->DonePreferredURI == false &&
170        Config->PreferredURI.empty() == true) {
171       SetNonBlock(InFd,false);
172       SetNonBlock(OutFd,false);
173       OutQueue += "679 Preferred URI\n\n";
174       Config->PreferredURI = "<none>";
175       if (OutFdReady() == true)
176          while (InFdReady() == true && Config->PreferredURI == "<none>");
177       SetNonBlock(InFd,true);
178       SetNonBlock(OutFd,true);
179    }
180
181    return true;
182 }
183                                                                         /*}}}*/
184 // Worker::ReadMessages - Read all pending messages into the list       /*{{{*/
185 // ---------------------------------------------------------------------
186 /* */
187 bool pkgAcquire::Worker::ReadMessages()
188 {
189    if (::ReadMessages(InFd,MessageQueue) == false)
190       return MethodFailure();
191    return true;
192 }
193                                                                         /*}}}*/
194 // Worker::RunMessage - Empty the message queue                         /*{{{*/
195 // ---------------------------------------------------------------------
196 /* This takes the messages from the message queue and runs them through
197    the parsers in order. */
198 bool pkgAcquire::Worker::RunMessages()
199 {
200    while (MessageQueue.empty() == false)
201    {
202       string Message = MessageQueue.front();
203       MessageQueue.erase(MessageQueue.begin());
204
205       if (Debug == true)
206          clog << " <- " << Access << ':' << QuoteString(Message,"\n") << endl;
207       
208       // Fetch the message number
209       char *End;
210       int Number = strtol(Message.c_str(),&End,10);
211       if (End == Message.c_str())
212          return _error->Error("Invalid message from method %s: %s",Access.c_str(),Message.c_str());
213
214       string URI = LookupTag(Message,"URI");
215       pkgAcquire::Queue::QItem *Itm = 0;
216       if (URI.empty() == false)
217          Itm = OwnerQ->FindItem(URI,this);
218       
219       // Determine the message number and dispatch
220       switch (Number)
221       {
222          // 100 Capabilities
223          case 100:
224          if (Capabilities(Message) == false)
225             return _error->Error("Unable to process Capabilities message from %s",Access.c_str());
226          break;
227          
228          // 101 Log
229          case 101:
230          if (Debug == true)
231             clog << " <- (log) " << LookupTag(Message,"Message") << endl;
232          break;
233          
234          // 102 Status
235          case 102:
236          Status = LookupTag(Message,"Message");
237          break;
238
239          // CNC:2004-04-27
240          // 179 Preferred URI
241          case 179:
242          Config->PreferredURI = LookupTag(Message, "PreferredURI");
243          break;
244
245          // 103 Redirect
246          case 103:
247          {
248             if (Itm == 0)
249             {
250                _error->Error("Method gave invalid 103 Redirect message");
251                break;
252             }
253
254             string NewURI = LookupTag(Message,"New-URI",URI.c_str());
255             Itm->URI = NewURI;
256             break;
257          }
258             
259          // 200 URI Start
260          case 200:
261          {
262             if (Itm == 0)
263             {
264                _error->Error("Method gave invalid 200 URI Start message");
265                break;
266             }
267             
268             CurrentItem = Itm;
269             CurrentSize = 0;
270             TotalSize = atoi(LookupTag(Message,"Size","0").c_str());
271             ResumePoint = atoi(LookupTag(Message,"Resume-Point","0").c_str());
272             Itm->Owner->Start(Message,atoi(LookupTag(Message,"Size","0").c_str()));
273
274             // Display update before completion
275             if (Log != 0 && Log->MorePulses == true)
276                Log->Pulse(Itm->Owner->GetOwner());
277             
278             if (Log != 0)
279                Log->Fetch(*Itm);
280
281             break;
282          }
283          
284          // 201 URI Done
285          case 201:
286          {
287             if (Itm == 0)
288             {
289                _error->Error("Method gave invalid 201 URI Done message");
290                break;
291             }
292             
293             pkgAcquire::Item *Owner = Itm->Owner;
294             pkgAcquire::ItemDesc Desc = *Itm;
295             
296             // Display update before completion
297             if (Log != 0 && Log->MorePulses == true)
298                Log->Pulse(Owner->GetOwner());
299             
300             OwnerQ->ItemDone(Itm);
301             if (TotalSize != 0 &&
302                 (unsigned)atoi(LookupTag(Message,"Size","0").c_str()) != TotalSize)
303                _error->Warning("Bizarre Error - File size is not what the server reported %s %lu",
304                                LookupTag(Message,"Size","0").c_str(),TotalSize);
305
306             // LORG:2006-03-09
307             // Look up the checksum type from owner
308             Owner->Done(Message,atoi(LookupTag(Message,"Size","0").c_str()),
309                         LookupTag(Message,Owner->ChecksumType().c_str()),Config);
310             
311             ItemDone();
312             
313             // Log that we are done
314             if (Log != 0)
315             {
316                if (StringToBool(LookupTag(Message,"IMS-Hit"),false) == true ||
317                    StringToBool(LookupTag(Message,"Alt-IMS-Hit"),false) == true)
318                {
319                   /* Hide 'hits' for local only sources - we also manage to
320                      hide gets */
321                   if (Config->LocalOnly == false)
322                      Log->IMSHit(Desc);
323                }               
324                else
325                   Log->Done(Desc);
326             }
327             break;
328          }       
329          
330          // 400 URI Failure
331          case 400:
332          {
333             if (Itm == 0)
334             {
335                _error->Error("Method gave invalid 400 URI Failure message");
336                break;
337             }
338
339             // Display update before completion
340             if (Log != 0 && Log->MorePulses == true)
341                Log->Pulse(Itm->Owner->GetOwner());
342             
343             pkgAcquire::Item *Owner = Itm->Owner;
344             pkgAcquire::ItemDesc Desc = *Itm;
345             OwnerQ->ItemDone(Itm);
346             Owner->Failed(Message,Config);
347             ItemDone();
348
349             if (Log != 0)
350                Log->Fail(Desc);
351
352             break;
353          }       
354          
355          // 401 General Failure
356          case 401:
357          _error->Error("Method %s General failure: %s",Access.c_str(),LookupTag(Message,"Message").c_str());
358          break;
359          
360          // 403 Media Change
361          case 403:
362          MediaChange(Message); 
363          break;
364
365          // 404 Authenticate
366          case 404:
367          Authenticate(Message);
368          break;
369       }      
370    }
371    return true;
372 }
373                                                                         /*}}}*/
374 // Worker::Capabilities - 100 Capabilities handler                      /*{{{*/
375 // ---------------------------------------------------------------------
376 /* This parses the capabilities message and dumps it into the configuration
377    structure. */
378 bool pkgAcquire::Worker::Capabilities(string Message)
379 {
380    if (Config == 0)
381       return true;
382    
383    Config->Version = LookupTag(Message,"Version");
384    Config->SingleInstance = StringToBool(LookupTag(Message,"Single-Instance"),false);
385    Config->Pipeline = StringToBool(LookupTag(Message,"Pipeline"),false);
386    Config->SendConfig = StringToBool(LookupTag(Message,"Send-Config"),false);
387    Config->LocalOnly = StringToBool(LookupTag(Message,"Local-Only"),false);
388    Config->NeedsCleanup = StringToBool(LookupTag(Message,"Needs-Cleanup"),false);
389    Config->Removable = StringToBool(LookupTag(Message,"Removable"),false);
390    // CNC:2004-04-27
391    Config->HasPreferredURI = StringToBool(LookupTag(Message,"Has-Preferred-URI"),false);
392
393    // Some debug text
394    if (Debug == true)
395    {
396       clog << "Configured access method " << Config->Access << endl;
397       clog << "Version:" << Config->Version <<
398               " SingleInstance:" << Config->SingleInstance <<
399               " Pipeline:" << Config->Pipeline << 
400               " SendConfig:" << Config->SendConfig << 
401               " LocalOnly: " << Config->LocalOnly << 
402               " NeedsCleanup: " << Config->NeedsCleanup << 
403               // CNC:2004-04-27
404               " Removable: " << Config->Removable <<
405               " HasPreferredURI: " << Config->HasPreferredURI << endl;
406    }
407    
408    return true;
409 }
410                                                                         /*}}}*/
411 // Worker::MediaChange - Request a media change                         /*{{{*/
412 // ---------------------------------------------------------------------
413 /* */
414 bool pkgAcquire::Worker::MediaChange(string Message)
415 {
416    if (Log == 0 || Log->MediaChange(LookupTag(Message,"Media"),
417                                     LookupTag(Message,"Drive")) == false)
418    {
419       char S[300];
420       snprintf(S,sizeof(S),"603 Media Changed\nFailed: true\n\n");
421       if (Debug == true)
422          clog << " -> " << Access << ':' << QuoteString(S,"\n") << endl;
423       OutQueue += S;
424       OutReady = true;
425       return true;
426    }
427
428    char S[300];
429    snprintf(S,sizeof(S),"603 Media Changed\n\n");
430    if (Debug == true)
431       clog << " -> " << Access << ':' << QuoteString(S,"\n") << endl;
432    OutQueue += S;
433    OutReady = true;
434    return true;
435 }
436                                                                         /*}}}*/
437 // Worker::Authenticate - Request authentication                        /*{{{*/
438 // ---------------------------------------------------------------------
439 /* */
440 bool pkgAcquire::Worker::Authenticate(string Message)
441 {
442    string User, Pass;
443    if (Log == 0 || Log->Authenticate(LookupTag(Message,"Description"),
444                                      User,Pass) == false)
445    {
446       char S[300];
447       snprintf(S,sizeof(S),"604 Authenticated\nFailed: true\n\n");
448       if (Debug == true)
449          clog << " -> " << Access << ':' << QuoteString(S,"\n") << endl;
450       OutQueue += S;
451       OutReady = true;
452       return true;
453    }
454
455    char S[300];
456    snprintf(S,sizeof(S),"604 Authenticated\nUser: %s\nPassword: %s\n\n",
457             User.c_str(), Pass.c_str());
458    if (Debug == true)
459       clog << " -> " << Access << ':' << QuoteString(S,"\n") << endl;
460    OutQueue += S;
461    OutReady = true;
462    return true;
463 }
464                                                                         /*}}}*/
465 // Worker::SendConfiguration - Send the config to the method            /*{{{*/
466 // ---------------------------------------------------------------------
467 /* */
468 bool pkgAcquire::Worker::SendConfiguration()
469 {
470    if (Config->SendConfig == false)
471       return true;
472
473    if (OutFd == -1)
474       return false;
475    
476    string Message = "601 Configuration\n";
477    Message.reserve(2000);
478
479    /* Write out all of the configuration directives by walking the 
480       configuration tree */
481    const Configuration::Item *Top = _config->Tree(0);
482    for (; Top != 0;)
483    {
484       if (Top->Value.empty() == false)
485       {
486          string Line = "Config-Item: " + QuoteString(Top->FullTag(),"=\"\n") + "=";
487          Line += QuoteString(Top->Value,"\n") + '\n';
488          Message += Line;
489       }
490       
491       if (Top->Child != 0)
492       {
493          Top = Top->Child;
494          continue;
495       }
496       
497       while (Top != 0 && Top->Next == 0)
498          Top = Top->Parent;
499       if (Top != 0)
500          Top = Top->Next;
501    }   
502    Message += '\n';
503
504    if (Debug == true)
505       clog << " -> " << Access << ':' << QuoteString(Message,"\n") << endl;
506    OutQueue += Message;
507    OutReady = true; 
508    
509    return true;
510 }
511                                                                         /*}}}*/
512 // Worker::QueueItem - Add an item to the outbound queue                /*{{{*/
513 // ---------------------------------------------------------------------
514 /* Send a URI Acquire message to the method */
515 bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem *Item)
516 {
517    if (OutFd == -1)
518       return false;
519    
520    string Message = "600 URI Acquire\n";
521    Message.reserve(300);
522    Message += "URI: " + Item->URI;
523    Message += "\nFilename: " + Item->Owner->DestFile;
524    Message += Item->Owner->Custom600Headers();
525    Message += "\n\n";
526    
527    if (Debug == true)
528       clog << " -> " << Access << ':' << QuoteString(Message,"\n") << endl;
529    OutQueue += Message;
530    OutReady = true;
531    
532    return true;
533 }
534                                                                         /*}}}*/
535 // Worker::OutFdRead - Out bound FD is ready                            /*{{{*/
536 // ---------------------------------------------------------------------
537 /* */
538 bool pkgAcquire::Worker::OutFdReady()
539 {
540    int Res;
541    do
542    {
543       Res = write(OutFd,OutQueue.c_str(),OutQueue.length());
544    }
545    while (Res < 0 && errno == EINTR);
546    
547    if (Res <= 0)
548       return MethodFailure();
549
550    // Hmm.. this should never happen.
551    if (Res < 0)
552       return true;
553    
554    OutQueue.erase(0,Res);
555    if (OutQueue.empty() == true)
556       OutReady = false;
557    
558    return true;
559 }
560                                                                         /*}}}*/
561 // Worker::InFdRead - In bound FD is ready                              /*{{{*/
562 // ---------------------------------------------------------------------
563 /* */
564 bool pkgAcquire::Worker::InFdReady()
565 {
566    if (ReadMessages() == false)
567       return false;
568    RunMessages();
569    return true;
570 }
571                                                                         /*}}}*/
572 // Worker::MethodFailure - Called when the method fails                 /*{{{*/
573 // ---------------------------------------------------------------------
574 /* This is called when the method is belived to have failed, probably because
575    read returned -1. */
576 bool pkgAcquire::Worker::MethodFailure()
577 {
578    _error->Error("Method %s has died unexpectedly!",Access.c_str());
579    
580    ExecWait(Process,Access.c_str(),true);
581    Process = -1;
582    close(InFd);
583    close(OutFd);
584    InFd = -1;
585    OutFd = -1;
586    OutReady = false;
587    InReady = false;
588    OutQueue = string();
589    MessageQueue.erase(MessageQueue.begin(),MessageQueue.end());
590    
591    return false;
592 }
593                                                                         /*}}}*/
594 // Worker::Pulse - Called periodically                                  /*{{{*/
595 // ---------------------------------------------------------------------
596 /* */
597 void pkgAcquire::Worker::Pulse()
598 {
599    if (CurrentItem == 0)
600       return;
601  
602    struct stat Buf;
603    if (stat(CurrentItem->Owner->DestFile.c_str(),&Buf) != 0)
604       return;
605    CurrentSize = Buf.st_size;
606    
607    // Hmm? Should not happen...
608    if (CurrentSize > TotalSize && TotalSize != 0)
609       TotalSize = CurrentSize;
610 }
611                                                                         /*}}}*/
612 // Worker::ItemDone - Called when the current item is finished          /*{{{*/
613 // ---------------------------------------------------------------------
614 /* */
615 void pkgAcquire::Worker::ItemDone()
616 {
617    CurrentItem = 0;
618    CurrentSize = 0;
619    TotalSize = 0;
620    Status = string();
621 }
622                                                                         /*}}}*/
623
624 // vim:sts=3:sw=3