XRootD
XrdClCopy.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #include "XrdApps/XrdCpConfig.hh"
26 #include "XrdApps/XrdCpFile.hh"
27 #include "XrdCl/XrdClConstants.hh"
29 #include "XrdCl/XrdClDefaultEnv.hh"
30 #include "XrdCl/XrdClLog.hh"
31 #include "XrdCl/XrdClFileSystem.hh"
32 #include "XrdCl/XrdClUtils.hh"
33 #include "XrdCl/XrdClDlgEnv.hh"
34 #include "XrdCl/XrdClOptimizers.hh"
35 #include "XrdSys/XrdSysE2T.hh"
36 #include "XrdSys/XrdSysPthread.hh"
38 
39 #include <cstdio>
40 #include <iostream>
41 #include <iomanip>
42 #include <limits>
43 
44 //------------------------------------------------------------------------------
45 // Progress notifier
46 //------------------------------------------------------------------------------
48 {
49  public:
50  //--------------------------------------------------------------------------
52  //--------------------------------------------------------------------------
53  ProgressDisplay(): pPrevious(0), pPrintProgressBar(true),
54  pPrintSourceCheckSum(false), pPrintTargetCheckSum(false),
55  pPrintAdditionalCheckSum(false)
56  {}
57 
58  //--------------------------------------------------------------------------
60  //--------------------------------------------------------------------------
61  virtual void BeginJob( uint16_t jobNum,
62  uint16_t jobTotal,
63  const XrdCl::URL *source,
64  const XrdCl::URL *destination )
65  {
66  XrdSysMutexHelper scopedLock( pMutex );
67  if( pPrintProgressBar )
68  {
69  if( jobTotal > 1 )
70  {
71  std::cerr << "Job: " << jobNum << "/" << jobTotal << std::endl;
72  std::cerr << "Source: " << source->GetURL() << std::endl;
73  std::cerr << "Target: " << destination->GetURL() << std::endl;
74  }
75  }
76  pPrevious = 0;
77 
78  JobData d;
79  d.started = time(0);
80  d.source = source;
81  d.target = destination;
82  pOngoingJobs[jobNum] = d;
83  }
84 
85  //--------------------------------------------------------------------------
87  //--------------------------------------------------------------------------
88  virtual void EndJob( uint16_t jobNum, const XrdCl::PropertyList *results )
89  {
90  XrdSysMutexHelper scopedLock( pMutex );
91 
92  std::map<uint16_t, JobData>::iterator it = pOngoingJobs.find( jobNum );
93  if( it == pOngoingJobs.end() )
94  return;
95 
96  JobData &d = it->second;
97 
98  // make sure the last available status was printed, which may not be
99  // the case when processing stdio since we throttle printing and don't
100  // know the total size
101  JobProgress( jobNum, d.bytesProcessed, d.bytesTotal );
102 
103  if( pPrintProgressBar )
104  {
105  if( pOngoingJobs.size() > 1 )
106  std::cerr << "\r" << std::string(70, ' ') << "\r";
107  else
108  std::cerr << std::endl;
109  }
110 
112  results->Get( "status", st );
113  if( !st.IsOK() )
114  {
115  pOngoingJobs.erase(it);
116  return;
117  }
118 
119  std::string checkSum;
120  uint64_t size;
121  results->Get( "size", size );
122  if( pPrintSourceCheckSum )
123  {
124  results->Get( "sourceCheckSum", checkSum );
125  PrintCheckSum( d.source, checkSum, size );
126  }
127 
128  if( pPrintTargetCheckSum )
129  {
130  results->Get( "targetCheckSum", checkSum );
131  PrintCheckSum( d.target, checkSum, size );
132  }
133 
134  if( pPrintAdditionalCheckSum )
135  {
136  std::vector<std::string> addcksums;
137  results->Get( "additionalCkeckSum", addcksums );
138  for( auto &cks : addcksums )
139  PrintCheckSum( d.source, cks, size );
140  }
141 
142  pOngoingJobs.erase(it);
143  }
144 
145  //--------------------------------------------------------------------------
147  //--------------------------------------------------------------------------
148  std::string GetProgressBar( time_t now )
149  {
150  JobData &d = pOngoingJobs.begin()->second;
151 
152  uint64_t speed = 0;
153  if( now-d.started )
154  speed = d.bytesProcessed/(now-d.started);
155  else
156  speed = d.bytesProcessed;
157 
158  std::string bar;
159  int prog = 0;
160  int proc = 0;
161 
162  if( d.bytesTotal )
163  {
164  prog = (int)((double)d.bytesProcessed/d.bytesTotal*50);
165  proc = (int)((double)d.bytesProcessed/d.bytesTotal*100);
166  }
167  else
168  {
169  prog = 50;
170  proc = 100;
171  }
172  bar.append( prog, '=' );
173  if( prog < 50 )
174  bar += ">";
175 
176  std::ostringstream o;
177  o << "[" << XrdCl::Utils::BytesToString(d.bytesProcessed) << "B/";
178  o << XrdCl::Utils::BytesToString(d.bytesTotal) << "B]";
179  o << "[" << std::setw(3) << std::right << proc << "%]";
180  o << "[" << std::setw(50) << std::left;
181  o << bar;
182  o << "]";
183  o << "[" << XrdCl::Utils::BytesToString(speed) << "B/s] ";
184  return o.str();
185  }
186 
187  //--------------------------------------------------------------------------
189  //--------------------------------------------------------------------------
190  std::string GetSummaryBar( time_t now )
191  {
192  std::map<uint16_t, JobData>::iterator it;
193  std::ostringstream o;
194 
195  for( it = pOngoingJobs.begin(); it != pOngoingJobs.end(); ++it )
196  {
197  JobData &d = it->second;
198  uint16_t jobNum = it->first;
199 
200  uint64_t speed = 0;
201  if( now-d.started )
202  speed = d.bytesProcessed/(now-d.started);
203 
204  int proc = 0;
205  if( d.bytesTotal )
206  proc = (int)((double)d.bytesProcessed/d.bytesTotal*100);
207  else
208  proc = 100;
209 
210  o << "[#" << jobNum << ": ";
211  o << proc << "% ";
212  o << XrdCl::Utils::BytesToString(speed) << "B/s] ";
213  }
214  o << " ";
215  return o.str();
216  }
217 
218  //--------------------------------------------------------------------------
220  //--------------------------------------------------------------------------
221  virtual void JobProgress( uint16_t jobNum,
222  uint64_t bytesProcessed,
223  uint64_t bytesTotal )
224  {
225  XrdSysMutexHelper scopedLock( pMutex );
226 
227  if( pPrintProgressBar )
228  {
229  time_t now = time(0);
230  if( (now - pPrevious < 1) && (bytesProcessed != bytesTotal) )
231  return;
232  pPrevious = now;
233 
234  std::map<uint16_t, JobData>::iterator it = pOngoingJobs.find( jobNum );
235  if( it == pOngoingJobs.end() )
236  return;
237 
238  JobData &d = it->second;
239 
240  d.bytesProcessed = bytesProcessed;
241  d.bytesTotal = bytesTotal;
242 
243  std::string progress;
244  if( pOngoingJobs.size() == 1 )
245  progress = GetProgressBar( now );
246  else
247  progress = GetSummaryBar( now );
248 
249  std::cerr << "\r" << progress << std::flush;
250  }
251  }
252 
253  //--------------------------------------------------------------------------
255  //--------------------------------------------------------------------------
256  void PrintCheckSum( const XrdCl::URL *url,
257  const std::string &checkSum,
258  uint64_t size )
259  {
260  if( checkSum.empty() )
261  return;
262  std::string::size_type i = checkSum.find( ':' );
263  std::cerr << checkSum.substr( 0, i+1 ) << " ";
264  std::cerr << checkSum.substr( i+1, checkSum.length()-i ) << " ";
265 
266  if( url->IsLocalFile() )
267  std::cerr << url->GetPath() << " ";
268  else
269  {
270  std::cerr << url->GetProtocol() << "://" << url->GetHostId();
271  std::cerr << url->GetPath() << " ";
272  }
273 
274  std::cerr << size;
275  std::cerr << std::endl;
276  }
277 
278  //--------------------------------------------------------------------------
279  // Printing flags
280  //--------------------------------------------------------------------------
281  void PrintProgressBar( bool print ) { pPrintProgressBar = print; }
282  void PrintSourceCheckSum( bool print ) { pPrintSourceCheckSum = print; }
283  void PrintTargetCheckSum( bool print ) { pPrintTargetCheckSum = print; }
284  void PrintAdditionalCheckSum( bool print ) { pPrintAdditionalCheckSum = print; }
285 
286  private:
287  struct JobData
288  {
289  JobData(): bytesProcessed(0), bytesTotal(0),
290  started(0), source(0), target(0) {}
291  uint64_t bytesProcessed;
292  uint64_t bytesTotal;
293  time_t started;
294  const XrdCl::URL *source;
295  const XrdCl::URL *target;
296  };
297 
298  time_t pPrevious;
299  bool pPrintProgressBar;
300  bool pPrintSourceCheckSum;
301  bool pPrintTargetCheckSum;
302  bool pPrintAdditionalCheckSum;
303  std::map<uint16_t, JobData> pOngoingJobs;
304  XrdSysRecMutex pMutex;
305 };
306 
307 //------------------------------------------------------------------------------
308 // Check if we support all the specified user options
309 //------------------------------------------------------------------------------
311 {
312  if( config->pHost )
313  {
314  std::cerr << "SOCKS Proxies are not yet supported" << std::endl;
315  return false;
316  }
317 
318  return true;
319 }
320 
321 //------------------------------------------------------------------------------
322 // Append extra cgi info to existing URL
323 //------------------------------------------------------------------------------
324 void AppendCGI( std::string &url, const char *newCGI )
325 {
326  if( !newCGI || !(*newCGI) )
327  return;
328 
329  if( *newCGI == '&' )
330  ++newCGI;
331 
332  if( url.find( '?' ) == std::string::npos )
333  url += "?";
334 
335  if( url.find( '&' ) == std::string::npos )
336  url += "&";
337 
338  url += newCGI;
339 }
340 
341 //------------------------------------------------------------------------------
342 // Process commandline environment settings
343 //------------------------------------------------------------------------------
345 {
347 
348  XrdCpConfig::defVar *cursor = config->intDefs;
349  while( cursor )
350  {
351  env->PutInt( cursor->vName, cursor->intVal );
352  cursor = cursor->Next;
353  }
354 
355  cursor = config->strDefs;
356  while( cursor )
357  {
358  env->PutString( cursor->vName, cursor->strVal );
359  cursor = cursor->Next;
360  }
361 }
362 
363 //------------------------------------------------------------------------------
364 // Translate file type to a string for diagnostics purposes
365 //------------------------------------------------------------------------------
367 {
368  switch( type )
369  {
370  case XrdCpFile::isDir: return "directory";
371  case XrdCpFile::isFile: return "local file";
372  case XrdCpFile::isXroot: return "xroot";
373  case XrdCpFile::isXroots: return "xroots";
374  case XrdCpFile::isHttp: return "http";
375  case XrdCpFile::isHttps: return "https";
376  case XrdCpFile::isStdIO: return "stdio";
377  default: return "other";
378  };
379 }
380 
381 //------------------------------------------------------------------------------
382 // Count the sources
383 //------------------------------------------------------------------------------
384 uint32_t CountSources( XrdCpFile *file )
385 {
386  uint32_t count;
387  for( count = 0; file; file = file->Next, ++count ) {};
388  return count;
389 }
390 
391 //------------------------------------------------------------------------------
392 // Adjust file information for the cases when XrdCpConfig cannot do this
393 //------------------------------------------------------------------------------
395 {
396  //----------------------------------------------------------------------------
397  // If the file is url and the directory offset is not set we set it
398  // to the last slash
399  //----------------------------------------------------------------------------
400  if( file->Doff == 0 )
401  {
402  char *slash = file->Path;
403  for( ; *slash; ++slash ) {};
404  for( ; *slash != '/' && slash > file->Path; --slash ) {};
405  file->Doff = slash - file->Path;
406  }
407 };
408 
409 //------------------------------------------------------------------------------
410 // Recursively index all files and directories inside a remote directory
411 //------------------------------------------------------------------------------
413  std::string basePath,
414  long dirOffset )
415 {
416  using namespace XrdCl;
417 
418  Log *log = DefaultEnv::GetLog();
419  log->Debug( AppMsg, "Indexing %s", basePath.c_str() );
420 
421  DirectoryList *dirList = 0;
422  XRootDStatus st = fs->DirList( URL( basePath ).GetPath(), DirListFlags::Recursive
424  if( !st.IsOK() )
425  {
426  log->Info( AppMsg, "Failed to get directory listing for %s: %s",
427  basePath.c_str(),
428  st.GetErrorMessage().c_str() );
429  return 0;
430  }
431 
432  XrdCpFile start, *current = 0;
433  XrdCpFile *end = &start;
434  int badUrl = 0;
435  for( auto itr = dirList->Begin(); itr != dirList->End(); ++itr )
436  {
437  DirectoryList::ListEntry *e = *itr;
438  if( e->GetStatInfo()->TestFlags( StatInfo::IsDir ) )
439  continue;
440  std::string path = basePath + '/' + e->GetName();
441  current = new XrdCpFile( path.c_str(), badUrl );
442  if( badUrl )
443  {
444  log->Error( AppMsg, "Bad URL: %s", current->Path );
445  delete current;
446  return 0;
447  }
448 
449  current->Doff = dirOffset;
450  end->Next = current;
451  end = current;
452  }
453 
454  delete dirList;
455 
456  return start.Next;
457 }
458 
459 //------------------------------------------------------------------------------
460 // Clean up the copy job descriptors
461 //------------------------------------------------------------------------------
462 void CleanUpResults( std::vector<XrdCl::PropertyList *> &results )
463 {
464  std::vector<XrdCl::PropertyList *>::iterator it;
465  for( it = results.begin(); it != results.end(); ++it )
466  delete *it;
467 }
468 
469 //--------------------------------------------------------------------------
470 // Let the show begin
471 //------------------------------------------------------------------------------
472 int main( int argc, char **argv )
473 {
474  using namespace XrdCl;
475 
476  //----------------------------------------------------------------------------
477  // Configure the copy command, if it returns then everything went well, ugly
478  //----------------------------------------------------------------------------
479  XrdCpConfig config( argv[0] );
480  config.Config( argc, argv, XrdCpConfig::optRmtRec );
481  if( !AllOptionsSupported( &config ) )
482  return 50; // generic error
483  ProcessCommandLineEnv( &config );
484 
485  //----------------------------------------------------------------------------
486  // Set options
487  //----------------------------------------------------------------------------
488  CopyProcess process;
489  Log *log = DefaultEnv::GetLog();
490  if( config.Dlvl )
491  {
492  if( config.Dlvl == 1 ) log->SetLevel( Log::InfoMsg );
493  else if( config.Dlvl == 2 ) log->SetLevel( Log::DebugMsg );
494  else if( config.Dlvl == 3 ) log->SetLevel( Log::DumpMsg );
495  }
496 
497  ProgressDisplay progress;
498  if( config.Want(XrdCpConfig::DoNoPbar) || !isatty( fileno( stdout ) ) )
499  progress.PrintProgressBar( false );
500 
501  bool posc = false;
502  bool force = false;
503  bool coerce = false;
504  bool makedir = false;
505  bool dynSrc = false;
506  bool delegate = false;
507  bool preserveXAttr = false;
508  bool rmOnBadCksum = false;
509  bool continue_ = false;
510  bool recurse = false;
511  bool zipappend = false;
512  bool doserver = false;
513  std::string thirdParty = "none";
514 
515  if( config.Want( XrdCpConfig::DoPosc ) ) posc = true;
516  if( config.Want( XrdCpConfig::DoForce ) ) force = true;
517  if( config.Want( XrdCpConfig::DoCoerce ) ) coerce = true;
518  if( config.Want( XrdCpConfig::DoTpc ) ) thirdParty = "first";
519  if( config.Want( XrdCpConfig::DoTpcOnly ) ) thirdParty = "only";
520  if( config.Want( XrdCpConfig::DoZipAppend ) ) zipappend = true;
521  if( config.Want( XrdCpConfig::DoServer ) ) doserver = true;
522  if( config.Want( XrdCpConfig::DoTpcDlgt ) )
523  {
524  // the env var is being set already here (we are issuing a stat
525  // inhere and we need the env var when we are establishing the
526  // connection and authenticating), but we are also setting a delegate
527  // parameter for CopyJob so it can be used on its own.
529  delegate = true;
530  }
531  else
533 
534  if( config.Want( XrdCpConfig::DoRecurse ) )
535  {
536  makedir = true;
537  recurse = true;
538  }
539  if( config.Want( XrdCpConfig::DoPath ) ) makedir = true;
540  if( config.Want( XrdCpConfig::DoDynaSrc ) ) dynSrc = true;
541  if( config.Want( XrdCpConfig::DoXAttr ) ) preserveXAttr = true;
542  if( config.Want( XrdCpConfig::DoRmOnBadCksum ) ) rmOnBadCksum = true;
543  if( config.Want( XrdCpConfig::DoContinue ) ) continue_ = true;
544 
545  if( force && continue_ )
546  {
547  std::cerr << "Invalid argument combination: continue + force." << std::endl;
548  return 50;
549  }
550 
551  //----------------------------------------------------------------------------
552  // Checksums
553  //----------------------------------------------------------------------------
554  std::string checkSumType;
555  std::string checkSumPreset;
556  std::string checkSumMode = "none";
557  if( config.Want( XrdCpConfig::DoCksum ) )
558  {
559  checkSumMode = "end2end";
560  std::vector<std::string> ckSumParams;
561  Utils::splitString( ckSumParams, config.CksVal, ":" );
562  if( ckSumParams.size() > 1 )
563  {
564  if( ckSumParams[1] == "print" )
565  {
566  checkSumMode = "target";
567  progress.PrintTargetCheckSum( true );
568  }
569  else
570  checkSumPreset = ckSumParams[1];
571  }
572  checkSumType = ckSumParams[0];
573  }
574 
575  if( config.Want( XrdCpConfig::DoCksrc ) )
576  {
577  checkSumMode = "source";
578  std::vector<std::string> ckSumParams;
579  Utils::splitString( ckSumParams, config.CksVal, ":" );
580  if( ckSumParams.size() == 2 )
581  {
582  checkSumMode = "source";
583  checkSumType = ckSumParams[0];
584  progress.PrintSourceCheckSum( true );
585  }
586  else
587  {
588  std::cerr << "Invalid parameter: " << config.CksVal << std::endl;
589  return 50; // generic error
590  }
591  }
592 
593  if( !config.AddCksVal.empty() )
594  progress.PrintAdditionalCheckSum( true );
595 
596  //----------------------------------------------------------------------------
597  // ZIP archive
598  //----------------------------------------------------------------------------
599  std::string zipFile;
600  bool zip = false;
601  if( config.Want( XrdCpConfig::DoZip ) )
602  {
603  zipFile = config.zipFile;
604  zip = true;
605  }
606 
607  //----------------------------------------------------------------------------
608  // Extreme Copy
609  //----------------------------------------------------------------------------
610  int nbSources = 0;
611  bool xcp = false;
612  if( config.Want( XrdCpConfig::DoSources ) )
613  {
614  nbSources = config.nSrcs;
615  xcp = true;
616  }
617 
618  //----------------------------------------------------------------------------
619  // Environment settings
620  //----------------------------------------------------------------------------
622 
623  /* Stop PostMaster when exiting main() to ensure proper shutdown */
624  struct scope_exit {
625  ~scope_exit() { XrdCl::DefaultEnv::GetPostMaster()->Stop(); }
626  } stopPostMaster;
627 
628  if( config.nStrm != 0 )
629  env->PutInt( "SubStreamsPerChannel", config.nStrm + 1 /*stands for the control stream*/ );
630 
631  if( config.Retry != -1 )
632  {
633  env->PutInt( "CpRetry", config.Retry );
634  env->PutString( "CpRetryPolicy", config.RetryPolicy );
635  }
636 
637  if( config.Want( XrdCpConfig::DoNoTlsOK ) )
638  env->PutInt( "NoTlsOK", 1 );
639 
640  if( config.Want( XrdCpConfig::DoTlsNoData ) )
641  env->PutInt( "TlsNoData", 1 );
642 
643  if( config.Want( XrdCpConfig::DoTlsMLF ) )
644  env->PutInt( "TlsMetalink", 1 );
645 
646  if( config.Want( XrdCpConfig::DoZipMtlnCksum ) )
647  env->PutInt( "ZipMtlnCksum", 1 );
648 
649  int chunkSize = DefaultCPChunkSize;
650  env->GetInt( "CPChunkSize", chunkSize );
651 
652  int blockSize = DefaultXCpBlockSize;
653  env->GetInt( "XCpBlockSize", blockSize );
654 
655  int parallelChunks = DefaultCPParallelChunks;
656  env->GetInt( "CPParallelChunks", parallelChunks );
657  if( parallelChunks < 1 ||
658  parallelChunks > std::numeric_limits<uint8_t>::max() )
659  {
660  std::cerr << "Can only handle between 1 and ";
661  std::cerr << (int)std::numeric_limits<uint8_t>::max();
662  std::cerr << " chunks in parallel. You asked for " << parallelChunks;
663  std::cerr << "." << std::endl;
664  return 50; // generic error
665  }
666 
667  if( !preserveXAttr )
668  {
669  int val = DefaultPreserveXAttrs;
670  env->GetInt( "PreserveXAttrs", val );
671  if( val ) preserveXAttr = true;
672  }
673 
674  log->Dump( AppMsg, "Chunk size: %d, parallel chunks %d, streams: %d",
675  chunkSize, parallelChunks, config.nStrm + 1 );
676 
677  //----------------------------------------------------------------------------
678  // Build the URLs
679  //----------------------------------------------------------------------------
680  std::vector<XrdCl::PropertyList*> resultVect;
681 
682  std::string dest;
683  if( config.dstFile->Protocol == XrdCpFile::isDir ||
684  config.dstFile->Protocol == XrdCpFile::isFile )
685  {
686  dest = "file://";
687 
688  // if it is not an absolute path append cwd
689  if( config.dstFile->Path[0] != '/' )
690  {
691  char buf[FILENAME_MAX];
692  char *cwd = getcwd( buf, FILENAME_MAX );
693  if( !cwd )
694  {
695  XRootDStatus st( stError, XProtocol::mapError( errno ), errno );
696  std::cerr << st.GetErrorMessage() << std::endl;
697  return st.GetShellCode();
698  }
699  dest += cwd;
700  dest += '/';
701  }
702  }
703  dest += config.dstFile->Path;
704 
705  //----------------------------------------------------------------------------
706  // We need to check whether our target is a file or a directory:
707  // 1) it's a file, so we can accept only one source
708  // 2) it's a directory, so:
709  // * we can accept multiple sources
710  // * we need to append the source name
711  //----------------------------------------------------------------------------
712  bool targetIsDir = false;
713  bool targetExists = false;
714  if( config.dstFile->Protocol == XrdCpFile::isDir )
715  targetIsDir = true;
716  else if( config.dstFile->Protocol == XrdCpFile::isXroot ||
718  {
719  URL target( dest );
720  FileSystem fs( target );
721  StatInfo *statInfo = 0;
722  XRootDStatus st = fs.Stat( target.GetPathWithParams(), statInfo );
723  if( st.IsOK() )
724  {
725  if( statInfo->TestFlags( StatInfo::IsDir ) )
726  targetIsDir = true;
727  targetExists = true;
728  }
729  else if( st.errNo == kXR_NotFound && makedir )
730  {
731  int n = strlen(config.dstFile->Path);
732  if( config.dstFile->Path[n-1] == '/' )
733  targetIsDir = true;
734  }
735  else if( st.errNo == kXR_NotAuthorized )
736  {
737  log->Error( AppMsg, "%s (destination)", st.ToString().c_str() );
738  std::cerr << st.ToStr() << std::endl;
739  return st.GetShellCode();
740  }
741 
742  delete statInfo;
743  }
744 
745  if( !targetIsDir && targetExists && !force && !recurse && !zipappend )
746  {
747  XRootDStatus st( stError, errInvalidOp, EEXIST );
748  // Unable to create /tmp/test.txt; file exists
749  log->Error( AppMsg, "%s (destination)", st.ToString().c_str() );
750  std::cerr << "Run: " << st.ToStr() << std::endl;
751  return st.GetShellCode();
752  }
753 
754  //----------------------------------------------------------------------------
755  // If we have multiple sources and target is neither a directory nor stdout
756  // then we cannot proceed
757  //----------------------------------------------------------------------------
758  if( CountSources(config.srcFile) > 1 && !targetIsDir &&
759  config.dstFile->Protocol != XrdCpFile::isStdIO )
760  {
761  std::cerr << "Multiple sources were given but target is not a directory.";
762  std::cerr << std::endl;
763  return 50; // generic error
764  }
765 
766  //----------------------------------------------------------------------------
767  // If we're doing remote recursive copy, chain all the files (if it's a
768  // directory)
769  //----------------------------------------------------------------------------
770  bool remoteSrcIsDir = false;
771  if( config.Want( XrdCpConfig::DoRecurse ) &&
772  (config.srcFile->Protocol == XrdCpFile::isXroot ||
773  config.srcFile->Protocol == XrdCpFile::isXroots) )
774  {
775  URL source( config.srcFile->Path );
776  FileSystem *fs = new FileSystem( source );
777  StatInfo *statInfo = 0;
778 
779  XRootDStatus st = fs->Stat( source.GetPath(), statInfo );
780  if( st.IsOK() && statInfo->TestFlags( StatInfo::IsDir ) )
781  {
782  remoteSrcIsDir = true;
783  //------------------------------------------------------------------------
784  // Recursively index the remote directory
785  //------------------------------------------------------------------------
786  delete config.srcFile;
787  std::string url = source.GetURL();
788  config.srcFile = IndexRemote( fs, url, url.size() );
789  if ( !config.srcFile )
790  {
791  std::cerr << "Error indexing remote directory.";
792  return 50; // generic error
793  }
794  }
795 
796  delete fs;
797  delete statInfo;
798  }
799 
800  XrdCpFile *sourceFile = config.srcFile;
801  //----------------------------------------------------------------------------
802  // Process the sources
803  //----------------------------------------------------------------------------
804  while( sourceFile )
805  {
806  AdjustFileInfo( sourceFile );
807 
808  //--------------------------------------------------------------------------
809  // Create a job for every source
810  //--------------------------------------------------------------------------
811  PropertyList properties;
812  PropertyList *results = new PropertyList;
813  std::string source = sourceFile->Path;
814  if( sourceFile->Protocol == XrdCpFile::isFile )
815  {
816  // make sure it is an absolute path
817  if( source[0] == '/' )
818  source = "file://" + source;
819  else
820  {
821  char buf[FILENAME_MAX];
822  char *cwd = getcwd( buf, FILENAME_MAX );
823  if( !cwd )
824  {
825  XRootDStatus st( stError, XProtocol::mapError( errno ), errno );
826  std::cerr << st.GetErrorMessage() << std::endl;
827  return st.GetShellCode();
828  }
829  source = "file://" + std::string( cwd ) + '/' + source;
830  }
831  }
832 
833  AppendCGI( source, config.srcOpq );
834 
835  std::string sourcePathObf = sourceFile->Path;
836  std::string destPathObf = dest;
837  if( unlikely(log->GetLevel() >= Log::DumpMsg) ) {
838  sourcePathObf = obfuscateAuth(sourcePathObf);
839  destPathObf = obfuscateAuth(destPathObf);
840  }
841  log->Dump( AppMsg, "Processing source entry: %s, type %s, target file: %s, logLevel = %d",
842  sourcePathObf.c_str(), FileType2String( sourceFile->Protocol ),
843  destPathObf.c_str(), log->GetLevel() );
844 
845  //--------------------------------------------------------------------------
846  // Set up the job
847  //--------------------------------------------------------------------------
848  std::string target = dest;
849 
850 
851  bool srcIsDir = false;
852  // if this is local file, for a directory Dlen + Doff will overlap with path size
853  if( strncmp( sourceFile->ProtName, "file", 4 ) == 0 )
854  srcIsDir = std::string( sourceFile->Path ).size() == size_t( sourceFile->Doff + sourceFile->Dlen );
855  // otherwise we are handling a remote file
856  else
857  srcIsDir = remoteSrcIsDir;
858  // if this is a recursive copy make sure we preserve the directory structure
859  if( config.Want( XrdCpConfig::DoRecurse ) && srcIsDir )
860  {
861  // get the source directory
862  std::string srcDir( sourceFile->Path, sourceFile->Doff );
863  // remove the trailing slash
864  if( srcDir[srcDir.size() - 1] == '/' )
865  srcDir = srcDir.substr( 0, srcDir.size() - 1 );
866  size_t diroff = srcDir.rfind( '/' );
867  // if there is no '/' it means a directory name has been given as relative path
868  if( diroff == std::string::npos ) diroff = 0;
869  target += '/';
870  target += sourceFile->Path + diroff;
871  // remove the filename from destination path as it will be appended later anyway
872  target = target.substr( 0 , target.rfind('/') );
873  }
874  AppendCGI( target, config.dstOpq );
875 
876  properties.Set( "source", source );
877  properties.Set( "target", target );
878  properties.Set( "force", force );
879  properties.Set( "posc", posc );
880  properties.Set( "coerce", coerce );
881  properties.Set( "makeDir", makedir );
882  properties.Set( "dynamicSource", dynSrc );
883  properties.Set( "thirdParty", thirdParty );
884  properties.Set( "checkSumMode", checkSumMode );
885  properties.Set( "checkSumType", checkSumType );
886  properties.Set( "checkSumPreset", checkSumPreset );
887  properties.Set( "chunkSize", chunkSize );
888  properties.Set( "parallelChunks", parallelChunks );
889  properties.Set( "zipArchive", zip );
890  properties.Set( "xcp", xcp );
891  properties.Set( "xcpBlockSize", blockSize );
892  properties.Set( "delegate", delegate );
893  properties.Set( "targetIsDir", targetIsDir );
894  properties.Set( "preserveXAttr", preserveXAttr );
895  properties.Set( "xrate", config.xRate );
896  properties.Set( "xrateThreshold", config.xRateThreshold );
897  properties.Set( "rmOnBadCksum", rmOnBadCksum );
898  properties.Set( "continue", continue_ );
899  properties.Set( "zipAppend", zipappend );
900  properties.Set( "addcksums", config.AddCksVal );
901  properties.Set( "doServer", doserver );
902 
903  if( zip )
904  properties.Set( "zipSource", zipFile );
905 
906  if( xcp )
907  properties.Set( "nbXcpSources", nbSources );
908 
909 
910  XRootDStatus st = process.AddJob( properties, results );
911  if( !st.IsOK() )
912  {
913  std::cerr << "AddJob " << source << " -> " << target << ": ";
914  std::cerr << st.ToStr() << std::endl;
915  }
916  resultVect.push_back( results );
917  sourceFile = sourceFile->Next;
918  }
919 
920  //----------------------------------------------------------------------------
921  // Configure the copy process
922  //----------------------------------------------------------------------------
923  PropertyList processConfig;
924  processConfig.Set( "jobType", "configuration" );
925  processConfig.Set( "parallel", config.Parallel );
926  process.AddJob( processConfig, 0 );
927 
928  //----------------------------------------------------------------------------
929  // Prepare and run the copy process
930  //----------------------------------------------------------------------------
931  XRootDStatus st = process.Prepare();
932  if( !st.IsOK() )
933  {
934  CleanUpResults( resultVect );
935  std::cerr << "Prepare: " << st.ToStr() << std::endl;
936  return st.GetShellCode();
937  }
938 
939  st = process.Run( &progress );
940  if( !st.IsOK() )
941  {
942  if( resultVect.size() == 1 )
943  std::cerr << "Run: " << st.ToStr() << std::endl;
944  else
945  {
946  std::vector<XrdCl::PropertyList*>::iterator it;
947  uint16_t i = 1;
948  uint16_t jobsRun = 0;
949  uint16_t errors = 0;
950  for( it = resultVect.begin(); it != resultVect.end(); ++it, ++i )
951  {
952  if( !(*it)->HasProperty( "status" ) )
953  continue;
954 
955  XRootDStatus st = (*it)->Get<XRootDStatus>("status");
956  if( !st.IsOK() )
957  {
958  std::cerr << "Job #" << i << ": " << st.ToStr();
959  ++errors;
960  }
961  ++jobsRun;
962  }
963  std::cerr << "Jobs total: " << resultVect.size();
964  std::cerr << ", run: " << jobsRun;
965  std::cerr << ", errors: " << errors << std::endl;
966  }
967  CleanUpResults( resultVect );
968  return st.GetShellCode();
969  }
970  CleanUpResults( resultVect );
971  return 0;
972 }
973 
@ kXR_NotAuthorized
Definition: XProtocol.hh:1000
@ kXR_NotFound
Definition: XProtocol.hh:1001
bool AllOptionsSupported(XrdCpConfig *config)
Definition: XrdClCopy.cc:310
const char * FileType2String(XrdCpFile::PType type)
Definition: XrdClCopy.cc:366
int main(int argc, char **argv)
Definition: XrdClCopy.cc:472
XrdCpFile * IndexRemote(XrdCl::FileSystem *fs, std::string basePath, long dirOffset)
Definition: XrdClCopy.cc:412
void ProcessCommandLineEnv(XrdCpConfig *config)
Definition: XrdClCopy.cc:344
void CleanUpResults(std::vector< XrdCl::PropertyList * > &results)
Definition: XrdClCopy.cc:462
void AdjustFileInfo(XrdCpFile *file)
Definition: XrdClCopy.cc:394
uint32_t CountSources(XrdCpFile *file)
Definition: XrdClCopy.cc:384
void AppendCGI(std::string &url, const char *newCGI)
Definition: XrdClCopy.cc:324
#define unlikely(x)
std::string obfuscateAuth(const std::string &input)
void PrintAdditionalCheckSum(bool print)
Definition: XrdClCopy.cc:284
void PrintSourceCheckSum(bool print)
Definition: XrdClCopy.cc:282
virtual void EndJob(uint16_t jobNum, const XrdCl::PropertyList *results)
End job.
Definition: XrdClCopy.cc:88
void PrintProgressBar(bool print)
Definition: XrdClCopy.cc:281
void PrintCheckSum(const XrdCl::URL *url, const std::string &checkSum, uint64_t size)
Print the checksum.
Definition: XrdClCopy.cc:256
virtual void JobProgress(uint16_t jobNum, uint64_t bytesProcessed, uint64_t bytesTotal)
Job progress.
Definition: XrdClCopy.cc:221
std::string GetProgressBar(time_t now)
Get progress bar.
Definition: XrdClCopy.cc:148
virtual void BeginJob(uint16_t jobNum, uint16_t jobTotal, const XrdCl::URL *source, const XrdCl::URL *destination)
Begin job.
Definition: XrdClCopy.cc:61
std::string GetSummaryBar(time_t now)
Get sumary bar.
Definition: XrdClCopy.cc:190
ProgressDisplay()
Constructor.
Definition: XrdClCopy.cc:53
void PrintTargetCheckSum(bool print)
Definition: XrdClCopy.cc:283
static int mapError(int rc)
Definition: XProtocol.hh:1362
Copy the data from one point to another.
XRootDStatus Run(CopyProgressHandler *handler)
Run the copy jobs.
XRootDStatus Prepare()
XRootDStatus AddJob(const PropertyList &properties, PropertyList *results)
Interface for copy progress notification.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static Env * GetEnv()
Get default client environment.
const std::string & GetName() const
Get file name.
StatInfo * GetStatInfo()
Get the stat info object.
Iterator End()
Get the end iterator.
Iterator Begin()
Get the begin iterator.
void Enable()
Enable delegation in the environment.
Definition: XrdClDlgEnv.hh:47
static DlgEnv & Instance()
Definition: XrdClDlgEnv.hh:28
void Disable()
Disable delegation in the environment.
Definition: XrdClDlgEnv.hh:55
bool PutInt(const std::string &key, int value)
Definition: XrdClEnv.cc:110
bool PutString(const std::string &key, const std::string &value)
Definition: XrdClEnv.cc:52
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
Send file/filesystem queries to an XRootD cluster.
XRootDStatus DirList(const std::string &path, DirListFlags::Flags flags, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
XRootDStatus Stat(const std::string &path, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Handle diagnostics.
Definition: XrdClLog.hh:101
@ InfoMsg
print info
Definition: XrdClLog.hh:111
@ DebugMsg
print debug info
Definition: XrdClLog.hh:112
@ DumpMsg
print details of the request and responses
Definition: XrdClLog.hh:113
void SetLevel(LogLevel level)
Set the level of the messages that should be sent to the destination.
Definition: XrdClLog.hh:193
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
LogLevel GetLevel() const
Get the log level.
Definition: XrdClLog.hh:258
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition: XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
bool Stop()
Stop the postmaster.
A key-value pair map storing both keys and values as strings.
void Set(const std::string &name, const Item &value)
bool Get(const std::string &name, Item &item) const
Object stat info.
bool TestFlags(uint32_t flags) const
Test flags.
@ IsDir
This is a directory.
URL representation.
Definition: XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
const std::string & GetProtocol() const
Get the protocol.
Definition: XrdClURL.hh:118
std::string GetPathWithParams() const
Get the path with params.
Definition: XrdClURL.cc:318
std::string GetURL() const
Get the URL.
Definition: XrdClURL.hh:86
bool IsLocalFile() const
Definition: XrdClURL.cc:474
const std::string & GetPath() const
Get the path.
Definition: XrdClURL.hh:217
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition: XrdClUtils.hh:56
static std::string BytesToString(uint64_t bytes)
Convert bytes to a human readable string.
Definition: XrdClUtils.cc:367
const std::string & GetErrorMessage() const
Get error message.
std::string ToStr() const
Convert to string.
short Doff
Definition: XrdCpFile.hh:46
PType Protocol
Definition: XrdCpFile.hh:49
char * Path
Definition: XrdCpFile.hh:45
char ProtName[8]
Definition: XrdCpFile.hh:50
XrdCpFile * Next
Definition: XrdCpFile.hh:44
short Dlen
Definition: XrdCpFile.hh:47
const int DefaultCPChunkSize
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint64_t AppMsg
const uint16_t errInvalidOp
Definition: XrdClStatus.hh:51
const int DefaultCPParallelChunks
const int DefaultXCpBlockSize
const int DefaultPreserveXAttrs
@ Merge
Merge duplicates.
@ Recursive
Do a recursive listing.
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
std::string ToString() const
Create a string representation.
Definition: XrdClStatus.cc:97
uint32_t errNo
Errno, if any.
Definition: XrdClStatus.hh:148
int GetShellCode() const
Get the status code that may be returned to the shell.
Definition: XrdClStatus.hh:129
const char * vName
Definition: XrdCpConfig.hh:53
defVar * intDefs
Definition: XrdCpConfig.hh:63
void Config(int argc, char **argv, int Opts=0)
Definition: XrdCpConfig.cc:194
std::vector< std::string > AddCksVal
Definition: XrdCpConfig.hh:97
const char * dstOpq
Definition: XrdCpConfig.hh:65
static const uint64_t DoZipMtlnCksum
Definition: XrdCpConfig.hh:189
XrdCpFile * srcFile
Definition: XrdCpConfig.hh:90
XrdCpFile * dstFile
Definition: XrdCpConfig.hh:91
char * zipFile
Definition: XrdCpConfig.hh:93
static const uint64_t DoNoPbar
Definition: XrdCpConfig.hh:122
static const uint64_t DoCoerce
Definition: XrdCpConfig.hh:105
static const uint64_t DoForce
Definition: XrdCpConfig.hh:111
static const uint64_t DoRmOnBadCksum
Definition: XrdCpConfig.hh:192
static const uint64_t DoNoTlsOK
Definition: XrdCpConfig.hh:177
static const uint64_t DoTpc
Definition: XrdCpConfig.hh:150
char * pHost
Definition: XrdCpConfig.hh:71
static const uint64_t DoCksum
Definition: XrdCpConfig.hh:101
defVar * strDefs
Definition: XrdCpConfig.hh:64
static const uint64_t DoCksrc
Definition: XrdCpConfig.hh:100
static const uint64_t DoTpcDlgt
Definition: XrdCpConfig.hh:152
static const uint64_t DoZip
Definition: XrdCpConfig.hh:171
static const uint64_t DoContinue
Definition: XrdCpConfig.hh:195
const char * CksVal
Definition: XrdCpConfig.hh:88
static const uint64_t DoRecurse
Definition: XrdCpConfig.hh:132
const char * srcOpq
Definition: XrdCpConfig.hh:66
static const uint64_t DoZipAppend
Definition: XrdCpConfig.hh:204
static const uint64_t DoDynaSrc
Definition: XrdCpConfig.hh:166
int Want(uint64_t What)
Definition: XrdCpConfig.hh:226
long long xRate
Definition: XrdCpConfig.hh:68
static const uint64_t DoSources
Definition: XrdCpConfig.hh:144
static const uint64_t DoXAttr
Definition: XrdCpConfig.hh:186
static const uint64_t DoTlsMLF
Definition: XrdCpConfig.hh:180
static const int optRmtRec
Definition: XrdCpConfig.hh:218
std::string RetryPolicy
Definition: XrdCpConfig.hh:78
static const uint64_t DoPath
Definition: XrdCpConfig.hh:183
static const uint64_t DoPosc
Definition: XrdCpConfig.hh:125
long long xRateThreshold
Definition: XrdCpConfig.hh:69
static const uint64_t DoTpcOnly
Definition: XrdCpConfig.hh:151
static const uint64_t DoTlsNoData
Definition: XrdCpConfig.hh:174
static const uint64_t DoServer
Definition: XrdCpConfig.hh:138