XRootD
Loading...
Searching...
No Matches
XrdClCopy.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
26#include "XrdApps/XrdCpFile.hh"
30#include "XrdCl/XrdClLog.hh"
32#include "XrdCl/XrdClUtils.hh"
33#include "XrdCl/XrdClDlgEnv.hh"
35#include "XrdSys/XrdSysE2T.hh"
38
39#include <cstdio>
40#include <iostream>
41#include <iomanip>
42#include <limits>
43
44//------------------------------------------------------------------------------
45// Progress notifier
46//------------------------------------------------------------------------------
48{
49 public:
50 //--------------------------------------------------------------------------
52 //--------------------------------------------------------------------------
53 ProgressDisplay(): pPrevious(0), pPrintProgressBar(true),
54 pPrintSourceCheckSum(false), pPrintTargetCheckSum(false),
55 pPrintAdditionalCheckSum(false)
56 {}
57
58 //--------------------------------------------------------------------------
60 //--------------------------------------------------------------------------
61 virtual void BeginJob( uint16_t jobNum,
62 uint16_t jobTotal,
63 const XrdCl::URL *source,
64 const XrdCl::URL *destination )
65 {
66 XrdSysMutexHelper scopedLock( pMutex );
67 if( pPrintProgressBar )
68 {
69 if( jobTotal > 1 )
70 {
71 std::cerr << "Job: " << jobNum << "/" << jobTotal << std::endl;
72 std::cerr << "Source: " << source->GetURL() << std::endl;
73 std::cerr << "Target: " << destination->GetURL() << std::endl;
74 }
75 }
76 pPrevious = 0;
77
78 JobData d;
79 d.started = time(0);
80 d.source = source;
81 d.target = destination;
82 pOngoingJobs[jobNum] = d;
83 }
84
85 //--------------------------------------------------------------------------
87 //--------------------------------------------------------------------------
88 virtual void EndJob( uint16_t jobNum, const XrdCl::PropertyList *results )
89 {
90 XrdSysMutexHelper scopedLock( pMutex );
91
92 std::map<uint16_t, JobData>::iterator it = pOngoingJobs.find( jobNum );
93 if( it == pOngoingJobs.end() )
94 return;
95
96 JobData &d = it->second;
97
98 // make sure the last available status was printed, which may not be
99 // the case when processing stdio since we throttle printing and don't
100 // know the total size
101 JobProgress( jobNum, d.bytesProcessed, d.bytesTotal );
102
103 if( pPrintProgressBar )
104 {
105 if( pOngoingJobs.size() > 1 )
106 std::cerr << "\r" << std::string(70, ' ') << "\r";
107 else
108 std::cerr << std::endl;
109 }
110
112 results->Get( "status", st );
113 if( !st.IsOK() )
114 {
115 pOngoingJobs.erase(it);
116 return;
117 }
118
119 std::string checkSum;
120 uint64_t size;
121 results->Get( "size", size );
122 if( pPrintSourceCheckSum )
123 {
124 results->Get( "sourceCheckSum", checkSum );
125 PrintCheckSum( d.source, checkSum, size );
126 }
127
128 if( pPrintTargetCheckSum )
129 {
130 results->Get( "targetCheckSum", checkSum );
131 PrintCheckSum( d.target, checkSum, size );
132 }
133
134 if( pPrintAdditionalCheckSum )
135 {
136 std::vector<std::string> addcksums;
137 results->Get( "additionalCkeckSum", addcksums );
138 for( auto &cks : addcksums )
139 PrintCheckSum( d.source, cks, size );
140 }
141
142 pOngoingJobs.erase(it);
143 }
144
145 //--------------------------------------------------------------------------
147 //--------------------------------------------------------------------------
148 std::string GetProgressBar( time_t now )
149 {
150 JobData &d = pOngoingJobs.begin()->second;
151
152 uint64_t speed = 0;
153 if( now-d.started )
154 speed = d.bytesProcessed/(now-d.started);
155 else
156 speed = d.bytesProcessed;
157
158 std::string bar;
159 int prog = 0;
160 int proc = 0;
161
162 if( d.bytesTotal )
163 {
164 prog = (int)((double)d.bytesProcessed/d.bytesTotal*50);
165 proc = (int)((double)d.bytesProcessed/d.bytesTotal*100);
166 }
167 else
168 {
169 prog = 50;
170 proc = 100;
171 }
172 bar.append( prog, '=' );
173 if( prog < 50 )
174 bar += ">";
175
176 std::ostringstream o;
177 o << "[" << XrdCl::Utils::BytesToString(d.bytesProcessed) << "B/";
178 o << XrdCl::Utils::BytesToString(d.bytesTotal) << "B]";
179 o << "[" << std::setw(3) << std::right << proc << "%]";
180 o << "[" << std::setw(50) << std::left;
181 o << bar;
182 o << "]";
183 o << "[" << XrdCl::Utils::BytesToString(speed) << "B/s] ";
184 return o.str();
185 }
186
187 //--------------------------------------------------------------------------
189 //--------------------------------------------------------------------------
190 std::string GetSummaryBar( time_t now )
191 {
192 std::map<uint16_t, JobData>::iterator it;
193 std::ostringstream o;
194
195 for( it = pOngoingJobs.begin(); it != pOngoingJobs.end(); ++it )
196 {
197 JobData &d = it->second;
198 uint16_t jobNum = it->first;
199
200 uint64_t speed = 0;
201 if( now-d.started )
202 speed = d.bytesProcessed/(now-d.started);
203
204 int proc = 0;
205 if( d.bytesTotal )
206 proc = (int)((double)d.bytesProcessed/d.bytesTotal*100);
207 else
208 proc = 100;
209
210 o << "[#" << jobNum << ": ";
211 o << proc << "% ";
212 o << XrdCl::Utils::BytesToString(speed) << "B/s] ";
213 }
214 o << " ";
215 return o.str();
216 }
217
218 //--------------------------------------------------------------------------
220 //--------------------------------------------------------------------------
221 virtual void JobProgress( uint16_t jobNum,
222 uint64_t bytesProcessed,
223 uint64_t bytesTotal )
224 {
225 XrdSysMutexHelper scopedLock( pMutex );
226
227 if( pPrintProgressBar )
228 {
229 time_t now = time(0);
230 if( (now - pPrevious < 1) && (bytesProcessed != bytesTotal) )
231 return;
232 pPrevious = now;
233
234 std::map<uint16_t, JobData>::iterator it = pOngoingJobs.find( jobNum );
235 if( it == pOngoingJobs.end() )
236 return;
237
238 JobData &d = it->second;
239
240 d.bytesProcessed = bytesProcessed;
241 d.bytesTotal = bytesTotal;
242
243 std::string progress;
244 if( pOngoingJobs.size() == 1 )
245 progress = GetProgressBar( now );
246 else
247 progress = GetSummaryBar( now );
248
249 std::cerr << "\r" << progress << std::flush;
250 }
251 }
252
253 //--------------------------------------------------------------------------
255 //--------------------------------------------------------------------------
256 void PrintCheckSum( const XrdCl::URL *url,
257 const std::string &checkSum,
258 uint64_t size )
259 {
260 if( checkSum.empty() )
261 return;
262 std::string::size_type i = checkSum.find( ':' );
263 std::cerr << checkSum.substr( 0, i+1 ) << " ";
264 std::cerr << checkSum.substr( i+1, checkSum.length()-i ) << " ";
265
266 if( url->IsLocalFile() )
267 std::cerr << url->GetPath() << " ";
268 else
269 {
270 std::cerr << url->GetProtocol() << "://" << url->GetHostId();
271 std::cerr << url->GetPath() << " ";
272 }
273
274 std::cerr << size;
275 std::cerr << std::endl;
276 }
277
278 //--------------------------------------------------------------------------
279 // Printing flags
280 //--------------------------------------------------------------------------
281 void PrintProgressBar( bool print ) { pPrintProgressBar = print; }
282 void PrintSourceCheckSum( bool print ) { pPrintSourceCheckSum = print; }
283 void PrintTargetCheckSum( bool print ) { pPrintTargetCheckSum = print; }
284 void PrintAdditionalCheckSum( bool print ) { pPrintAdditionalCheckSum = print; }
285
286 private:
287 struct JobData
288 {
289 JobData(): bytesProcessed(0), bytesTotal(0),
290 started(0), source(0), target(0) {}
291 uint64_t bytesProcessed;
292 uint64_t bytesTotal;
293 time_t started;
294 const XrdCl::URL *source;
295 const XrdCl::URL *target;
296 };
297
298 time_t pPrevious;
299 bool pPrintProgressBar;
300 bool pPrintSourceCheckSum;
301 bool pPrintTargetCheckSum;
302 bool pPrintAdditionalCheckSum;
303 std::map<uint16_t, JobData> pOngoingJobs;
304 XrdSysRecMutex pMutex;
305};
306
307//------------------------------------------------------------------------------
308// Check if we support all the specified user options
309//------------------------------------------------------------------------------
311{
312 if( config->pHost )
313 {
314 std::cerr << "SOCKS Proxies are not yet supported" << std::endl;
315 return false;
316 }
317
318 return true;
319}
320
321//------------------------------------------------------------------------------
322// Append extra cgi info to existing URL
323//------------------------------------------------------------------------------
324void AppendCGI( std::string &url, const char *newCGI )
325{
326 if( !newCGI || !(*newCGI) )
327 return;
328
329 if( *newCGI == '&' )
330 ++newCGI;
331
332 if( url.find( '?' ) == std::string::npos )
333 url += "?";
334
335 if( url.find( '&' ) == std::string::npos )
336 url += "&";
337
338 url += newCGI;
339}
340
341//------------------------------------------------------------------------------
342// Process commandline environment settings
343//------------------------------------------------------------------------------
345{
347
348 XrdCpConfig::defVar *cursor = config->intDefs;
349 while( cursor )
350 {
351 env->PutInt( cursor->vName, cursor->intVal );
352 cursor = cursor->Next;
353 }
354
355 cursor = config->strDefs;
356 while( cursor )
357 {
358 env->PutString( cursor->vName, cursor->strVal );
359 cursor = cursor->Next;
360 }
361}
362
363//------------------------------------------------------------------------------
364// Translate file type to a string for diagnostics purposes
365//------------------------------------------------------------------------------
367{
368 switch( type )
369 {
370 case XrdCpFile::isDir: return "directory";
371 case XrdCpFile::isFile: return "local file";
372 case XrdCpFile::isXroot: return "xroot";
373 case XrdCpFile::isXroots: return "xroots";
374 case XrdCpFile::isHttp: return "http";
375 case XrdCpFile::isHttps: return "https";
376 case XrdCpFile::isStdIO: return "stdio";
377 default: return "other";
378 };
379}
380
381//------------------------------------------------------------------------------
382// Count the sources
383//------------------------------------------------------------------------------
384uint32_t CountSources( XrdCpFile *file )
385{
386 uint32_t count;
387 for( count = 0; file; file = file->Next, ++count ) {};
388 return count;
389}
390
391//------------------------------------------------------------------------------
392// Adjust file information for the cases when XrdCpConfig cannot do this
393//------------------------------------------------------------------------------
395{
396 //----------------------------------------------------------------------------
397 // If the file is url and the directory offset is not set we set it
398 // to the last slash
399 //----------------------------------------------------------------------------
400 if( file->Doff == 0 )
401 {
402 char *slash = file->Path;
403 for( ; *slash; ++slash ) {};
404 for( ; *slash != '/' && slash > file->Path; --slash ) {};
405 file->Doff = slash - file->Path;
406 }
407};
408
409//------------------------------------------------------------------------------
410// Recursively index all files and directories inside a remote directory
411//------------------------------------------------------------------------------
413 std::string basePath,
414 long dirOffset )
415{
416 using namespace XrdCl;
417
418 Log *log = DefaultEnv::GetLog();
419 log->Debug( AppMsg, "Indexing %s", basePath.c_str() );
420
421 DirectoryList *dirList = 0;
422 XRootDStatus st = fs->DirList( URL( basePath ).GetPath(), DirListFlags::Recursive
423 | DirListFlags::Locate | DirListFlags::Merge, dirList );
424 if( !st.IsOK() )
425 {
426 log->Info( AppMsg, "Failed to get directory listing for %s: %s",
427 basePath.c_str(),
428 st.GetErrorMessage().c_str() );
429 return 0;
430 }
431
432 XrdCpFile start, *current = 0;
433 XrdCpFile *end = &start;
434 int badUrl = 0;
435 for( auto itr = dirList->Begin(); itr != dirList->End(); ++itr )
436 {
437 DirectoryList::ListEntry *e = *itr;
438 if( e->GetStatInfo()->TestFlags( StatInfo::IsDir ) )
439 continue;
440 std::string path = basePath + '/' + e->GetName();
441 current = new XrdCpFile( path.c_str(), badUrl );
442 if( badUrl )
443 {
444 log->Error( AppMsg, "Bad URL: %s", current->Path );
445 delete current;
446 return 0;
447 }
448
449 current->Doff = dirOffset;
450 end->Next = current;
451 end = current;
452 }
453
454 delete dirList;
455
456 return start.Next;
457}
458
459//------------------------------------------------------------------------------
460// Clean up the copy job descriptors
461//------------------------------------------------------------------------------
462void CleanUpResults( std::vector<XrdCl::PropertyList *> &results )
463{
464 std::vector<XrdCl::PropertyList *>::iterator it;
465 for( it = results.begin(); it != results.end(); ++it )
466 delete *it;
467}
468
469//--------------------------------------------------------------------------
470// Let the show begin
471//------------------------------------------------------------------------------
472int main( int argc, char **argv )
473{
474 using namespace XrdCl;
475
476 //----------------------------------------------------------------------------
477 // Configure the copy command, if it returns then everything went well, ugly
478 //----------------------------------------------------------------------------
479 XrdCpConfig config( argv[0] );
480 config.Config( argc, argv, XrdCpConfig::optRmtRec );
481 if( !AllOptionsSupported( &config ) )
482 return 50; // generic error
483 ProcessCommandLineEnv( &config );
484
485 //----------------------------------------------------------------------------
486 // Set options
487 //----------------------------------------------------------------------------
488 CopyProcess process;
489 Log *log = DefaultEnv::GetLog();
490 if( config.Dlvl )
491 {
492 if( config.Dlvl == 1 ) log->SetLevel( Log::InfoMsg );
493 else if( config.Dlvl == 2 ) log->SetLevel( Log::DebugMsg );
494 else if( config.Dlvl == 3 ) log->SetLevel( Log::DumpMsg );
495 }
496
497 ProgressDisplay progress;
498 if( config.Want(XrdCpConfig::DoNoPbar) || !isatty( fileno( stdout ) ) )
499 progress.PrintProgressBar( false );
500
501 bool posc = false;
502 bool force = false;
503 bool coerce = false;
504 bool makedir = false;
505 bool dynSrc = false;
506 bool delegate = false;
507 bool preserveXAttr = false;
508 bool rmOnBadCksum = false;
509 bool continue_ = false;
510 bool recurse = false;
511 bool zipappend = false;
512 bool doserver = false;
513 std::string thirdParty = "none";
514
515 if( config.Want( XrdCpConfig::DoPosc ) ) posc = true;
516 if( config.Want( XrdCpConfig::DoForce ) ) force = true;
517 if( config.Want( XrdCpConfig::DoCoerce ) ) coerce = true;
518 if( config.Want( XrdCpConfig::DoTpc ) ) thirdParty = "first";
519 if( config.Want( XrdCpConfig::DoTpcOnly ) ) thirdParty = "only";
520 if( config.Want( XrdCpConfig::DoZipAppend ) ) zipappend = true;
521 if( config.Want( XrdCpConfig::DoServer ) ) doserver = true;
522 if( config.Want( XrdCpConfig::DoTpcDlgt ) )
523 {
524 // the env var is being set already here (we are issuing a stat
525 // inhere and we need the env var when we are establishing the
526 // connection and authenticating), but we are also setting a delegate
527 // parameter for CopyJob so it can be used on its own.
528 DlgEnv::Instance().Enable();
529 delegate = true;
530 }
531 else
532 DlgEnv::Instance().Disable();
533
534 if( config.Want( XrdCpConfig::DoRecurse ) )
535 {
536 makedir = true;
537 recurse = true;
538 }
539 if( config.Want( XrdCpConfig::DoPath ) ) makedir = true;
540 if( config.Want( XrdCpConfig::DoDynaSrc ) ) dynSrc = true;
541 if( config.Want( XrdCpConfig::DoXAttr ) ) preserveXAttr = true;
542 if( config.Want( XrdCpConfig::DoRmOnBadCksum ) ) rmOnBadCksum = true;
543 if( config.Want( XrdCpConfig::DoContinue ) ) continue_ = true;
544
545 if( force && continue_ )
546 {
547 std::cerr << "Invalid argument combination: continue + force." << std::endl;
548 return 50;
549 }
550
551 //----------------------------------------------------------------------------
552 // Checksums
553 //----------------------------------------------------------------------------
554 std::string checkSumType;
555 std::string checkSumPreset;
556 std::string checkSumMode = "none";
557 if( config.Want( XrdCpConfig::DoCksum ) )
558 {
559 checkSumMode = "end2end";
560 std::vector<std::string> ckSumParams;
561 Utils::splitString( ckSumParams, config.CksVal, ":" );
562 if( ckSumParams.size() > 1 )
563 {
564 if( ckSumParams[1] == "print" )
565 {
566 checkSumMode = "target";
567 progress.PrintTargetCheckSum( true );
568 }
569 else
570 checkSumPreset = ckSumParams[1];
571 }
572 checkSumType = ckSumParams[0];
573 }
574
575 if( config.Want( XrdCpConfig::DoCksrc ) )
576 {
577 checkSumMode = "source";
578 std::vector<std::string> ckSumParams;
579 Utils::splitString( ckSumParams, config.CksVal, ":" );
580 if( ckSumParams.size() == 2 )
581 {
582 checkSumMode = "source";
583 checkSumType = ckSumParams[0];
584 progress.PrintSourceCheckSum( true );
585 }
586 else
587 {
588 std::cerr << "Invalid parameter: " << config.CksVal << std::endl;
589 return 50; // generic error
590 }
591 }
592
593 if( !config.AddCksVal.empty() )
594 progress.PrintAdditionalCheckSum( true );
595
596 //----------------------------------------------------------------------------
597 // ZIP archive
598 //----------------------------------------------------------------------------
599 std::string zipFile;
600 bool zip = false;
601 if( config.Want( XrdCpConfig::DoZip ) )
602 {
603 zipFile = config.zipFile;
604 zip = true;
605 }
606
607 //----------------------------------------------------------------------------
608 // Extreme Copy
609 //----------------------------------------------------------------------------
610 int nbSources = 0;
611 bool xcp = false;
612 if( config.Want( XrdCpConfig::DoSources ) )
613 {
614 nbSources = config.nSrcs;
615 xcp = true;
616 }
617
618 //----------------------------------------------------------------------------
619 // Environment settings
620 //----------------------------------------------------------------------------
622
623 /* Stop PostMaster when exiting main() to ensure proper shutdown */
624 struct scope_exit {
625 ~scope_exit() { XrdCl::DefaultEnv::GetPostMaster()->Stop(); }
626 } stopPostMaster;
627
628 if( config.nStrm != 0 )
629 env->PutInt( "SubStreamsPerChannel", config.nStrm + 1 /*stands for the control stream*/ );
630
631 if( config.Retry != -1 )
632 {
633 env->PutInt( "CpRetry", config.Retry );
634 env->PutString( "CpRetryPolicy", config.RetryPolicy );
635 }
636
637 if( config.Want( XrdCpConfig::DoNoTlsOK ) )
638 env->PutInt( "NoTlsOK", 1 );
639
640 if( config.Want( XrdCpConfig::DoTlsNoData ) )
641 env->PutInt( "TlsNoData", 1 );
642
643 if( config.Want( XrdCpConfig::DoTlsMLF ) )
644 env->PutInt( "TlsMetalink", 1 );
645
646 if( config.Want( XrdCpConfig::DoZipMtlnCksum ) )
647 env->PutInt( "ZipMtlnCksum", 1 );
648
649 int chunkSize = DefaultCPChunkSize;
650 env->GetInt( "CPChunkSize", chunkSize );
651
652 int blockSize = DefaultXCpBlockSize;
653 env->GetInt( "XCpBlockSize", blockSize );
654
655 int parallelChunks = DefaultCPParallelChunks;
656 env->GetInt( "CPParallelChunks", parallelChunks );
657 if( parallelChunks < 1 ||
658 parallelChunks > std::numeric_limits<uint8_t>::max() )
659 {
660 std::cerr << "Can only handle between 1 and ";
661 std::cerr << (int)std::numeric_limits<uint8_t>::max();
662 std::cerr << " chunks in parallel. You asked for " << parallelChunks;
663 std::cerr << "." << std::endl;
664 return 50; // generic error
665 }
666
667 if( !preserveXAttr )
668 {
669 int val = DefaultPreserveXAttrs;
670 env->GetInt( "PreserveXAttrs", val );
671 if( val ) preserveXAttr = true;
672 }
673
674 log->Dump( AppMsg, "Chunk size: %d, parallel chunks %d, streams: %d",
675 chunkSize, parallelChunks, config.nStrm + 1 );
676
677 //----------------------------------------------------------------------------
678 // Build the URLs
679 //----------------------------------------------------------------------------
680 std::vector<XrdCl::PropertyList*> resultVect;
681
682 std::string dest;
683 if( config.dstFile->Protocol == XrdCpFile::isDir ||
685 {
686 dest = "file://";
687
688 // if it is not an absolute path append cwd
689 if( config.dstFile->Path[0] != '/' )
690 {
691 char buf[FILENAME_MAX];
692 char *cwd = getcwd( buf, FILENAME_MAX );
693 if( !cwd )
694 {
695 XRootDStatus st( stError, XProtocol::mapError( errno ), errno );
696 std::cerr << st.GetErrorMessage() << std::endl;
697 return st.GetShellCode();
698 }
699 dest += cwd;
700 dest += '/';
701 }
702 }
703 dest += config.dstFile->Path;
704
705 //----------------------------------------------------------------------------
706 // We need to check whether our target is a file or a directory:
707 // 1) it's a file, so we can accept only one source
708 // 2) it's a directory, so:
709 // * we can accept multiple sources
710 // * we need to append the source name
711 //----------------------------------------------------------------------------
712 bool targetIsDir = false;
713 bool targetExists = false;
714 if( config.dstFile->Protocol == XrdCpFile::isDir )
715 targetIsDir = true;
716 else if( config.dstFile->Protocol == XrdCpFile::isXroot ||
718 {
719 URL target( dest );
720 FileSystem fs( target );
721 StatInfo *statInfo = 0;
722 XRootDStatus st = fs.Stat( target.GetPathWithParams(), statInfo );
723 if( st.IsOK() )
724 {
725 if( statInfo->TestFlags( StatInfo::IsDir ) )
726 targetIsDir = true;
727 targetExists = true;
728 }
729 else if( st.errNo == kXR_NotFound && makedir )
730 {
731 int n = strlen(config.dstFile->Path);
732 if( config.dstFile->Path[n-1] == '/' )
733 targetIsDir = true;
734 }
735 else if( st.errNo == kXR_NotAuthorized )
736 {
737 log->Error( AppMsg, "%s (destination)", st.ToString().c_str() );
738 std::cerr << st.ToStr() << std::endl;
739 return st.GetShellCode();
740 }
741
742 delete statInfo;
743 }
744
745 if( !targetIsDir && targetExists && !force && !recurse && !zipappend )
746 {
747 XRootDStatus st( stError, errInvalidOp, EEXIST );
748 // Unable to create /tmp/test.txt; file exists
749 log->Error( AppMsg, "%s (destination)", st.ToString().c_str() );
750 std::cerr << "Run: " << st.ToStr() << std::endl;
751 return st.GetShellCode();
752 }
753
754 //----------------------------------------------------------------------------
755 // If we have multiple sources and target is neither a directory nor stdout
756 // then we cannot proceed
757 //----------------------------------------------------------------------------
758 if( CountSources(config.srcFile) > 1 && !targetIsDir &&
760 {
761 std::cerr << "Multiple sources were given but target is not a directory.";
762 std::cerr << std::endl;
763 return 50; // generic error
764 }
765
766 //----------------------------------------------------------------------------
767 // If we're doing remote recursive copy, chain all the files (if it's a
768 // directory)
769 //----------------------------------------------------------------------------
770 bool remoteSrcIsDir = false;
771 if( config.Want( XrdCpConfig::DoRecurse ) &&
772 (config.srcFile->Protocol == XrdCpFile::isXroot ||
774 {
775 URL source( config.srcFile->Path );
776 FileSystem *fs = new FileSystem( source );
777 StatInfo *statInfo = 0;
778
779 XRootDStatus st = fs->Stat( source.GetPath(), statInfo );
780 if( st.IsOK() && statInfo->TestFlags( StatInfo::IsDir ) )
781 {
782 remoteSrcIsDir = true;
783 //------------------------------------------------------------------------
784 // Recursively index the remote directory
785 //------------------------------------------------------------------------
786 delete config.srcFile;
787 std::string url = source.GetURL();
788 config.srcFile = IndexRemote( fs, url, url.size() );
789 if ( !config.srcFile )
790 {
791 std::cerr << "Error indexing remote directory.";
792 return 50; // generic error
793 }
794 }
795
796 delete fs;
797 delete statInfo;
798 }
799
800 XrdCpFile *sourceFile = config.srcFile;
801 //----------------------------------------------------------------------------
802 // Process the sources
803 //----------------------------------------------------------------------------
804 while( sourceFile )
805 {
806 AdjustFileInfo( sourceFile );
807
808 //--------------------------------------------------------------------------
809 // Create a job for every source
810 //--------------------------------------------------------------------------
811 PropertyList properties;
812 PropertyList *results = new PropertyList;
813 std::string source = sourceFile->Path;
814 if( sourceFile->Protocol == XrdCpFile::isFile )
815 {
816 // make sure it is an absolute path
817 if( source[0] == '/' )
818 source = "file://" + source;
819 else
820 {
821 char buf[FILENAME_MAX];
822 char *cwd = getcwd( buf, FILENAME_MAX );
823 if( !cwd )
824 {
825 XRootDStatus st( stError, XProtocol::mapError( errno ), errno );
826 std::cerr << st.GetErrorMessage() << std::endl;
827 return st.GetShellCode();
828 }
829 source = "file://" + std::string( cwd ) + '/' + source;
830 }
831 }
832
833 AppendCGI( source, config.srcOpq );
834
835 std::string sourcePathObf = sourceFile->Path;
836 std::string destPathObf = dest;
837 if( unlikely(log->GetLevel() >= Log::DumpMsg) ) {
838 sourcePathObf = obfuscateAuth(sourcePathObf);
839 destPathObf = obfuscateAuth(destPathObf);
840 }
841 log->Dump( AppMsg, "Processing source entry: %s, type %s, target file: %s, logLevel = %d",
842 sourcePathObf.c_str(), FileType2String( sourceFile->Protocol ),
843 destPathObf.c_str(), log->GetLevel() );
844
845 //--------------------------------------------------------------------------
846 // Set up the job
847 //--------------------------------------------------------------------------
848 std::string target = dest;
849
850
851 bool srcIsDir = false;
852 // if this is local file, for a directory Dlen + Doff will overlap with path size
853 if( strncmp( sourceFile->ProtName, "file", 4 ) == 0 )
854 srcIsDir = std::string( sourceFile->Path ).size() == size_t( sourceFile->Doff + sourceFile->Dlen );
855 // otherwise we are handling a remote file
856 else
857 srcIsDir = remoteSrcIsDir;
858 // if this is a recursive copy make sure we preserve the directory structure
859 if( config.Want( XrdCpConfig::DoRecurse ) && srcIsDir )
860 {
861 // get the source directory
862 std::string srcDir( sourceFile->Path, sourceFile->Doff );
863 // remove the trailing slash
864 if( srcDir[srcDir.size() - 1] == '/' )
865 srcDir = srcDir.substr( 0, srcDir.size() - 1 );
866 size_t diroff = srcDir.rfind( '/' );
867 // if there is no '/' it means a directory name has been given as relative path
868 if( diroff == std::string::npos ) diroff = 0;
869 target += '/';
870 target += sourceFile->Path + diroff;
871 // remove the filename from destination path as it will be appended later anyway
872 target = target.substr( 0 , target.rfind('/') );
873 }
874 AppendCGI( target, config.dstOpq );
875
876 properties.Set( "source", source );
877 properties.Set( "target", target );
878 properties.Set( "force", force );
879 properties.Set( "posc", posc );
880 properties.Set( "coerce", coerce );
881 properties.Set( "makeDir", makedir );
882 properties.Set( "dynamicSource", dynSrc );
883 properties.Set( "thirdParty", thirdParty );
884 properties.Set( "checkSumMode", checkSumMode );
885 properties.Set( "checkSumType", checkSumType );
886 properties.Set( "checkSumPreset", checkSumPreset );
887 properties.Set( "chunkSize", chunkSize );
888 properties.Set( "parallelChunks", parallelChunks );
889 properties.Set( "zipArchive", zip );
890 properties.Set( "xcp", xcp );
891 properties.Set( "xcpBlockSize", blockSize );
892 properties.Set( "delegate", delegate );
893 properties.Set( "targetIsDir", targetIsDir );
894 properties.Set( "preserveXAttr", preserveXAttr );
895 properties.Set( "xrate", config.xRate );
896 properties.Set( "xrateThreshold", config.xRateThreshold );
897 properties.Set( "rmOnBadCksum", rmOnBadCksum );
898 properties.Set( "continue", continue_ );
899 properties.Set( "zipAppend", zipappend );
900 properties.Set( "addcksums", config.AddCksVal );
901 properties.Set( "doServer", doserver );
902
903 if( zip )
904 properties.Set( "zipSource", zipFile );
905
906 if( xcp )
907 properties.Set( "nbXcpSources", nbSources );
908
909
910 XRootDStatus st = process.AddJob( properties, results );
911 if( !st.IsOK() )
912 {
913 std::cerr << "AddJob " << source << " -> " << target << ": ";
914 std::cerr << st.ToStr() << std::endl;
915 }
916 resultVect.push_back( results );
917 sourceFile = sourceFile->Next;
918 }
919
920 //----------------------------------------------------------------------------
921 // Configure the copy process
922 //----------------------------------------------------------------------------
923 PropertyList processConfig;
924 processConfig.Set( "jobType", "configuration" );
925 processConfig.Set( "parallel", config.Parallel );
926 process.AddJob( processConfig, 0 );
927
928 //----------------------------------------------------------------------------
929 // Prepare and run the copy process
930 //----------------------------------------------------------------------------
931 XRootDStatus st = process.Prepare();
932 if( !st.IsOK() )
933 {
934 CleanUpResults( resultVect );
935 std::cerr << "Prepare: " << st.ToStr() << std::endl;
936 return st.GetShellCode();
937 }
938
939 st = process.Run( &progress );
940 if( !st.IsOK() )
941 {
942 if( resultVect.size() == 1 )
943 std::cerr << "Run: " << st.ToStr() << std::endl;
944 else
945 {
946 std::vector<XrdCl::PropertyList*>::iterator it;
947 uint16_t i = 1;
948 uint16_t jobsRun = 0;
949 uint16_t errors = 0;
950 for( it = resultVect.begin(); it != resultVect.end(); ++it, ++i )
951 {
952 if( !(*it)->HasProperty( "status" ) )
953 continue;
954
955 XRootDStatus st = (*it)->Get<XRootDStatus>("status");
956 if( !st.IsOK() )
957 {
958 std::cerr << "Job #" << i << ": " << st.ToStr();
959 ++errors;
960 }
961 ++jobsRun;
962 }
963 std::cerr << "Jobs total: " << resultVect.size();
964 std::cerr << ", run: " << jobsRun;
965 std::cerr << ", errors: " << errors << std::endl;
966 }
967 CleanUpResults( resultVect );
968 return st.GetShellCode();
969 }
970 CleanUpResults( resultVect );
971 return 0;
972}
973
@ kXR_NotAuthorized
@ kXR_NotFound
bool AllOptionsSupported(XrdCpConfig *config)
Definition XrdClCopy.cc:310
int main(int argc, char **argv)
Definition XrdClCopy.cc:472
const char * FileType2String(XrdCpFile::PType type)
Definition XrdClCopy.cc:366
void ProcessCommandLineEnv(XrdCpConfig *config)
Definition XrdClCopy.cc:344
void CleanUpResults(std::vector< XrdCl::PropertyList * > &results)
Definition XrdClCopy.cc:462
XrdCpFile * IndexRemote(XrdCl::FileSystem *fs, std::string basePath, long dirOffset)
Definition XrdClCopy.cc:412
void AdjustFileInfo(XrdCpFile *file)
Definition XrdClCopy.cc:394
uint32_t CountSources(XrdCpFile *file)
Definition XrdClCopy.cc:384
void AppendCGI(std::string &url, const char *newCGI)
Definition XrdClCopy.cc:324
#define unlikely(x)
std::string obfuscateAuth(const std::string &input)
void PrintAdditionalCheckSum(bool print)
Definition XrdClCopy.cc:284
void PrintSourceCheckSum(bool print)
Definition XrdClCopy.cc:282
virtual void EndJob(uint16_t jobNum, const XrdCl::PropertyList *results)
End job.
Definition XrdClCopy.cc:88
void PrintProgressBar(bool print)
Definition XrdClCopy.cc:281
void PrintCheckSum(const XrdCl::URL *url, const std::string &checkSum, uint64_t size)
Print the checksum.
Definition XrdClCopy.cc:256
virtual void JobProgress(uint16_t jobNum, uint64_t bytesProcessed, uint64_t bytesTotal)
Job progress.
Definition XrdClCopy.cc:221
std::string GetProgressBar(time_t now)
Get progress bar.
Definition XrdClCopy.cc:148
virtual void BeginJob(uint16_t jobNum, uint16_t jobTotal, const XrdCl::URL *source, const XrdCl::URL *destination)
Begin job.
Definition XrdClCopy.cc:61
std::string GetSummaryBar(time_t now)
Get sumary bar.
Definition XrdClCopy.cc:190
ProgressDisplay()
Constructor.
Definition XrdClCopy.cc:53
void PrintTargetCheckSum(bool print)
Definition XrdClCopy.cc:283
static int mapError(int rc)
Copy the data from one point to another.
XRootDStatus Run(CopyProgressHandler *handler)
Run the copy jobs.
XRootDStatus AddJob(const PropertyList &properties, PropertyList *results)
Interface for copy progress notification.
static PostMaster * GetPostMaster()
Get default post master.
static Env * GetEnv()
Get default client environment.
const std::string & GetName() const
Get file name.
StatInfo * GetStatInfo()
Get the stat info object.
Iterator End()
Get the end iterator.
Iterator Begin()
Get the begin iterator.
bool PutInt(const std::string &key, int value)
Definition XrdClEnv.cc:110
bool PutString(const std::string &key, const std::string &value)
Definition XrdClEnv.cc:52
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
Send file/filesystem queries to an XRootD cluster.
XRootDStatus DirList(const std::string &path, DirListFlags::Flags flags, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
XRootDStatus Stat(const std::string &path, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Handle diagnostics.
Definition XrdClLog.hh:101
void SetLevel(LogLevel level)
Set the level of the messages that should be sent to the destination.
Definition XrdClLog.hh:193
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
LogLevel GetLevel() const
Get the log level.
Definition XrdClLog.hh:258
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition XrdClLog.cc:299
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
bool Stop()
Stop the postmaster.
A key-value pair map storing both keys and values as strings.
void Set(const std::string &name, const Item &value)
bool Get(const std::string &name, Item &item) const
Object stat info.
bool TestFlags(uint32_t flags) const
Test flags.
URL representation.
Definition XrdClURL.hh:31
const std::string & GetPath() const
Get the path.
Definition XrdClURL.hh:217
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition XrdClURL.hh:99
std::string GetPathWithParams() const
Get the path with params.
Definition XrdClURL.cc:318
std::string GetURL() const
Get the URL.
Definition XrdClURL.hh:86
bool IsLocalFile() const
Definition XrdClURL.cc:474
const std::string & GetProtocol() const
Get the protocol.
Definition XrdClURL.hh:118
static std::string BytesToString(uint64_t bytes)
Convert bytes to a human readable string.
const std::string & GetErrorMessage() const
Get error message.
std::string ToStr() const
Convert to string.
short Doff
Definition XrdCpFile.hh:46
PType Protocol
Definition XrdCpFile.hh:49
char * Path
Definition XrdCpFile.hh:45
char ProtName[8]
Definition XrdCpFile.hh:50
XrdCpFile * Next
Definition XrdCpFile.hh:44
short Dlen
Definition XrdCpFile.hh:47
bool IsOK() const
We're fine.
std::string ToString() const
Create a string representation.
uint32_t errNo
Errno, if any.
int GetShellCode() const
Get the status code that may be returned to the shell.
const char * vName
defVar * intDefs
void Config(int argc, char **argv, int Opts=0)
std::vector< std::string > AddCksVal
const char * dstOpq
static const uint64_t DoZipMtlnCksum
XrdCpFile * srcFile
XrdCpFile * dstFile
char * zipFile
static const uint64_t DoNoPbar
static const uint64_t DoCoerce
static const uint64_t DoForce
static const uint64_t DoRmOnBadCksum
static const uint64_t DoNoTlsOK
static const uint64_t DoTpc
static const uint64_t DoCksum
defVar * strDefs
static const uint64_t DoCksrc
static const uint64_t DoTpcDlgt
static const uint64_t DoZip
static const uint64_t DoContinue
const char * CksVal
static const uint64_t DoRecurse
const char * srcOpq
static const uint64_t DoZipAppend
static const uint64_t DoDynaSrc
int Want(uint64_t What)
long long xRate
static const uint64_t DoSources
static const uint64_t DoXAttr
static const uint64_t DoTlsMLF
static const int optRmtRec
std::string RetryPolicy
static const uint64_t DoPath
static const uint64_t DoPosc
long long xRateThreshold
static const uint64_t DoTpcOnly
static const uint64_t DoTlsNoData
static const uint64_t DoServer