XRootD
Loading...
Searching...
No Matches
XrdClEcHandler.hh
Go to the documentation of this file.
1/*
2 * XrdClEcHandler.hh
3 *
4 * Created on: 23 Mar 2021
5 * Author: simonm
6 */
7
8#ifndef SRC_XRDCL_XRDCLECHANDLER_HH_
9#define SRC_XRDCL_XRDCLECHANDLER_HH_
10
12#include "XrdCl/XrdClUtils.hh"
15
16#include "XrdEc/XrdEcReader.hh"
18
19#include "XrdOuc/XrdOucCRC.hh"
21
22#include <memory>
23#include <iostream>
24#include <chrono>
25#include <algorithm>
26#include <mutex>
27
28namespace XrdCl
29{
30 class FreeSpace {
31 public:
32 std::string address;
33 uint64_t freeSpace;
35 bool operator<(const FreeSpace &a) const
36 {
37 return ((freeSpace > a.freeSpace) ? true : false);
38 }
39 void Dump() const
40 {
41 std::cout << address << " : " << freeSpace << std::endl;
42 }
43 };
44
46 public:
49 // From the old location list, select a new location list
50 // n: select at least "n" nodes in the new location list
52 XrdCl::LocationInfo &newList,
53 uint32_t n);
54 void Dump();
55 private:
56 std::vector<FreeSpace> ServerList;
57 std::vector<std::string> ExportPaths;
58 time_t lastUpdateT = 0;
59 int xRatio = 10;
60 std::mutex lock;
61 bool initExportPaths = false;
62
63 void TryInitExportPaths();
64 uint64_t GetFreeSpace(const std::string addr);
65 bool BlindSelect();
66 void UpdateSpaceInfo();
68 void AddServers(XrdCl::LocationInfo &locInfo);
69 };
70
72 {
73 private:
74 XrdCl::ResponseHandler *realHandler;
75 public:
76 // constructor
78
79 // Response Handler
81 AnyObject *rdresp)
82 {
83 if( !status->IsOK() )
84 {
85 realHandler->HandleResponse( status, rdresp );
86 delete this;
87 return;
88 }
89
90 ChunkInfo *chunk = 0;
91 rdresp->Get(chunk);
92
93 if (!chunk) {
94 delete this;
95 return;
96 }
97
98 std::vector<uint32_t> cksums;
99 size_t nbpages = chunk->length / XrdSys::PageSize;
100 if( chunk->length % XrdSys::PageSize )
101 ++nbpages;
102 cksums.reserve( nbpages );
103
104 size_t size = chunk->length;
105 char *buffer = reinterpret_cast<char*>( chunk->buffer );
106
107 for( size_t pg = 0; pg < nbpages; ++pg )
108 {
109 size_t pgsize = XrdSys::PageSize;
110 if( pgsize > size ) pgsize = size;
111 uint32_t crcval = XrdOucCRC::Calc32C( buffer, pgsize );
112 cksums.push_back( crcval );
113 buffer += pgsize;
114 size -= pgsize;
115 }
116
117 PageInfo *pages = new PageInfo(chunk->offset, chunk->length, chunk->buffer, std::move(cksums));
118 delete rdresp;
119 AnyObject *response = new AnyObject();
120 response->Set( pages );
121 realHandler->HandleResponse( status, response );
122
123 delete this;
124 }
125 };
126
127 class EcHandler : public FilePlugIn
128 {
129 public:
130 EcHandler( const URL &redir,
131 XrdEc::ObjCfg *objcfg,
132 std::unique_ptr<CheckSumHelper> cksHelper ) : redir( redir ),
133 fs( redir, false ),
134 objcfg( objcfg ),
135 curroff( 0 ),
136 cksHelper( std::move( cksHelper ) )
137 {
139 }
140
141 virtual ~EcHandler()
142 {
143 }
144
145 XRootDStatus Open( uint16_t flags,
146 ResponseHandler *handler,
147 uint16_t timeout )
148 {
149 if( ( flags & OpenFlags::Write ) || ( flags & OpenFlags::Update ) )
150 {
151 if( !( flags & OpenFlags::New ) || // it has to be a new file
152 ( flags & OpenFlags::Delete ) || // truncation is not supported
153 ( flags & OpenFlags::Read ) ) // write + read is not supported
155
156 if( objcfg->plgr.empty() )
157 {
158 XRootDStatus st = LoadPlacement();
159 if( !st.IsOK() ) return st;
160 }
161 writer.reset( new XrdEc::StrmWriter( *objcfg ) );
162 writer->Open( handler, timeout );
163 return XRootDStatus();
164 }
165
166 if( flags & OpenFlags::Read )
167 {
168 if( flags & OpenFlags::Write )
170
171 if( objcfg->plgr.empty() )
172 {
173 XRootDStatus st = LoadPlacement( redir.GetPath() );
174 if( !st.IsOK() ) return st;
175 }
176 reader.reset( new XrdEc::Reader( *objcfg ) );
177 reader->Open( handler, timeout );
178 return XRootDStatus();
179 }
180
182 }
183
184 XRootDStatus Open( const std::string &url,
185 OpenFlags::Flags flags,
186 Access::Mode mode,
187 ResponseHandler *handler,
188 uint16_t timeout )
189 {
190 (void)url; (void)mode;
191 return Open( flags, handler, timeout );
192 }
193
194
195 //------------------------------------------------------------------------
197 //------------------------------------------------------------------------
199 uint16_t timeout )
200 {
201 if( writer )
202 {
203 writer->Close( ResponseHandler::Wrap( [this, handler]( XRootDStatus *st, AnyObject *rsp )
204 {
205 writer.reset();
206 if( st->IsOK() && bool( cksHelper ) )
207 {
208 std::string commit = redir.GetPath()
209 + "?xrdec.objid=" + objcfg->obj
210 + "&xrdec.close=true&xrdec.size=" + std::to_string( curroff );
211 if( cksHelper )
212 {
213 std::string ckstype = cksHelper->GetType();
214 std::string cksval;
215 auto st = cksHelper->GetCheckSum( cksval, ckstype );
216 if( !st.IsOK() )
217 {
218 handler->HandleResponse( new XRootDStatus( st ), nullptr );
219 return;
220 }
221 commit += "&xrdec.cksum=" + cksval;
222 }
223 Buffer arg; arg.FromString( commit );
224 auto st = fs.Query( QueryCode::OpaqueFile, arg, handler );
225 if( !st.IsOK() ) handler->HandleResponse( new XRootDStatus( st ), nullptr );
226 return;
227 }
228 handler->HandleResponse( st, rsp );
229 } ), timeout );
230 return XRootDStatus();
231 }
232
233 if( reader )
234 {
235 reader->Close( ResponseHandler::Wrap( [this, handler]( XRootDStatus *st, AnyObject *rsp )
236 {
237 reader.reset();
238 handler->HandleResponse( st, rsp );
239 } ), timeout );
240 return XRootDStatus();
241 }
242
243 return XRootDStatus( stError, errNotSupported );
244 }
245
246 //------------------------------------------------------------------------
248 //------------------------------------------------------------------------
249 XRootDStatus Stat( bool force,
250 ResponseHandler *handler,
251 uint16_t timeout )
252 {
253
254 if( !objcfg->nomtfile )
255 return fs.Stat( redir.GetPath(), handler, timeout );
256
257 if( !force && statcache )
258 {
259 auto rsp = StatRsp( statcache->GetSize() );
260 Schedule( handler, rsp );
261 return XRootDStatus();
262 }
263
264 if( writer )
265 {
266 statcache.reset( new StatInfo() );
267 statcache->SetSize( writer->GetSize() );
268 auto rsp = StatRsp( statcache->GetSize() );
269 Schedule( handler, rsp );
270 return XRootDStatus();
271 }
272
273 if( reader )
274 {
275 statcache.reset( new StatInfo() );
276 statcache->SetSize( reader->GetSize() );
277 auto rsp = StatRsp( statcache->GetSize() );
278 Schedule( handler, rsp );
279 return XRootDStatus();
280 }
281
282 return XRootDStatus( stError, errInvalidOp, 0, "File not open." );
283 }
284
285 //------------------------------------------------------------------------
287 //------------------------------------------------------------------------
288 XRootDStatus Read( uint64_t offset,
289 uint32_t size,
290 void *buffer,
291 ResponseHandler *handler,
292 uint16_t timeout )
293 {
294 if( !reader ) return XRootDStatus( stError, errInternal );
295
296 reader->Read( offset, size, buffer, handler, timeout );
297 return XRootDStatus();
298 }
299
300 //------------------------------------------------------------------------
302 //------------------------------------------------------------------------
303 XRootDStatus PgRead(uint64_t offset, uint32_t size, void *buffer,
304 ResponseHandler *handler,
305 uint16_t timeout)
306 {
307 ResponseHandler *substitHandler = new EcPgReadResponseHandler( handler );
308 XRootDStatus st = Read(offset, size, buffer, substitHandler, timeout);
309 return st;
310 }
311
312
313 //------------------------------------------------------------------------
315 //------------------------------------------------------------------------
316 XRootDStatus Write( uint64_t offset,
317 uint32_t size,
318 const void *buffer,
319 ResponseHandler *handler,
320 uint16_t timeout )
321 {
322 if( cksHelper )
323 cksHelper->Update( buffer, size );
324
325 if( !writer ) return XRootDStatus( stError, errInternal );
326 if( offset != curroff ) return XRootDStatus( stError, errNotSupported );
327 writer->Write( size, buffer, handler );
328 curroff += size;
329 return XRootDStatus();
330 }
331
332 //------------------------------------------------------------------------
334 //------------------------------------------------------------------------
335 XRootDStatus PgWrite( uint64_t offset,
336 uint32_t size,
337 const void *buffer,
338 std::vector<uint32_t> &cksums,
339 ResponseHandler *handler,
340 uint16_t timeout = 0 )
341 {
342 if(! cksums.empty() )
343 {
344 const char *data = static_cast<const char*>( buffer );
345 std::vector<uint32_t> local_cksums;
346 XrdOucPgrwUtils::csCalc( data, offset, size, local_cksums );
347 if (data) delete data;
348 if (local_cksums != cksums)
349 return XRootDStatus( stError, errInvalidArgs, 0, "data and crc32c digests do not match." );
350 }
351 return Write(offset, size, buffer, handler, timeout);
352 }
353
354 //------------------------------------------------------------------------
356 //------------------------------------------------------------------------
357 bool IsOpen() const
358 {
359 return writer || reader;
360 }
361
362 private:
363
364 inline XRootDStatus LoadPlacement()
365 {
366 LocationInfo *infoAll = nullptr;
367 XRootDStatus st = fs.DeepLocate( "*", OpenFlags::PrefName, infoAll );
368 std::unique_ptr<LocationInfo> ptr( infoAll );
369 if( !st.IsOK() ) return st;
370
371 LocationInfo *info = new LocationInfo();
372 std::unique_ptr<LocationInfo> ptr1( info );
373
374 static ServerSpaceInfo ssi;
375 ssi.SelectLocations(*infoAll, *info, objcfg->nbchunks);
376
377 if( info->GetSize() < objcfg->nbchunks )
378 return XRootDStatus( stError, errInvalidOp, 0, "Too few data servers." );
379 unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
380 shuffle (info->Begin(), info->End(), std::default_random_engine(seed));
381 for( size_t i = 0; i < objcfg->nbchunks; ++i )
382 {
383 auto &location = info->At( i );
384 objcfg->plgr.emplace_back( "root://" + location.GetAddress() + '/' );
385 }
386 return XRootDStatus();
387 }
388
389 inline XRootDStatus LoadPlacement( const std::string &path )
390 {
391 LocationInfo *info = nullptr;
392 XRootDStatus st = fs.DeepLocate( "*", OpenFlags::PrefName, info );
393 std::unique_ptr<LocationInfo> ptr( info );
394 if( !st.IsOK() ) return st;
395 // The following check become meaningless
396 if( info->GetSize() < objcfg->nbdata )
397 return XRootDStatus( stError, errInvalidOp, 0, "Too few data servers." );
398
399 uint64_t verNumMax = 0;
400 std::vector<uint64_t> verNums;
401 std::vector<std::string> xattrkeys;
402 std::vector<XrdCl::XAttr> xattrvals;
403 xattrkeys.push_back("xrdec.strpver");
404 for( size_t i = 0; i < info->GetSize(); ++i )
405 {
406 FileSystem *fs_i = new FileSystem(info->At( i ).GetAddress());
407 xattrvals.clear();
408 st = fs_i->GetXAttr(path, xattrkeys, xattrvals, 0);
409 if (st.IsOK() && ! xattrvals[0].value.empty())
410 {
411 std::stringstream sstream(xattrvals[0].value);
412 uint64_t verNum;
413 sstream >> verNum;
414 verNums.push_back(verNum);
415 if (verNum > verNumMax)
416 verNumMax = verNum;
417 }
418 else
419 verNums.push_back(0);
420 delete fs_i;
421 }
422
423 int n = 0;
424 for( size_t i = 0; i < info->GetSize(); ++i )
425 {
426 if ( verNums.at(i) == 0 || verNums.at(i) != verNumMax )
427 continue;
428 else
429 n++;
430 auto &location = info->At( i );
431 objcfg->plgr.emplace_back( "root://" + location.GetAddress() + '/' );
432 }
433 if (n < objcfg->nbdata )
434 return XRootDStatus( stError, errInvalidOp, 0, "Too few data servers." );
435 return XRootDStatus();
436 }
437
438 inline static AnyObject* StatRsp( uint64_t size )
439 {
440 StatInfo *info = new StatInfo();
441 info->SetSize( size );
442 AnyObject *rsp = new AnyObject();
443 rsp->Set( info );
444 return rsp;
445 }
446
447 inline static void Schedule( ResponseHandler *handler, AnyObject *rsp )
448 {
449 ResponseJob *job = new ResponseJob( handler, new XRootDStatus(), rsp, nullptr );
451 }
452
453 URL redir;
454 FileSystem fs;
455 std::unique_ptr<XrdEc::ObjCfg> objcfg;
456 std::unique_ptr<XrdEc::StrmWriter> writer;
457 std::unique_ptr<XrdEc::Reader> reader;
458 uint64_t curroff;
459 std::unique_ptr<CheckSumHelper> cksHelper;
460 std::unique_ptr<StatInfo> statcache;
461
462 };
463
464 //----------------------------------------------------------------------------
466 //----------------------------------------------------------------------------
468 {
469 public:
470 //------------------------------------------------------------------------
472 //------------------------------------------------------------------------
473 EcPlugInFactory( uint8_t nbdta, uint8_t nbprt, uint64_t chsz,
474 std::vector<std::string> && plgr ) :
475 nbdta( nbdta ), nbprt( nbprt ), chsz( chsz ), plgr( std::move( plgr ) )
476 {
477 }
478
479 //------------------------------------------------------------------------
481 //------------------------------------------------------------------------
483 {
484 }
485
486 //------------------------------------------------------------------------
488 //------------------------------------------------------------------------
489 virtual FilePlugIn *CreateFile( const std::string &u )
490 {
491 URL url( u );
492 XrdEc::ObjCfg *objcfg = new XrdEc::ObjCfg( url.GetPath(), nbdta, nbprt,
493 chsz, false, true );
494 objcfg->plgr = std::move( plgr );
495 return new EcHandler( url, objcfg, nullptr );
496 }
497
498 //------------------------------------------------------------------------
500 //------------------------------------------------------------------------
501 virtual FileSystemPlugIn *CreateFileSystem( const std::string &url )
502 {
503 return nullptr;
504 }
505
506 private:
507 uint8_t nbdta;
508 uint8_t nbprt;
509 uint64_t chsz;
510 std::vector<std::string> plgr;
511 };
512
513 EcHandler* GetEcHandler( const URL &headnode, const URL &redirurl );
514
515} /* namespace XrdCl */
516
517#endif /* SRC_XRDCL_XRDCLECHANDLER_HH_ */
518
bool Exists
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
Binary blob representation.
static PostMaster * GetPostMaster()
Get default post master.
XRootDStatus Close(ResponseHandler *handler, uint16_t timeout)
XRootDStatus PgWrite(uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Open(const std::string &url, OpenFlags::Flags flags, Access::Mode mode, ResponseHandler *handler, uint16_t timeout)
XRootDStatus Write(uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout)
XRootDStatus Stat(bool force, ResponseHandler *handler, uint16_t timeout)
bool IsOpen() const
XRootDStatus Read(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout)
XRootDStatus PgRead(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout)
XRootDStatus Open(uint16_t flags, ResponseHandler *handler, uint16_t timeout)
EcHandler(const URL &redir, XrdEc::ObjCfg *objcfg, std::unique_ptr< CheckSumHelper > cksHelper)
EcPgReadResponseHandler(ResponseHandler *a)
void HandleResponse(XRootDStatus *status, AnyObject *rdresp)
EcPlugInFactory(uint8_t nbdta, uint8_t nbprt, uint64_t chsz, std::vector< std::string > &&plgr)
Constructor.
virtual FileSystemPlugIn * CreateFileSystem(const std::string &url)
Create a file system plug-in for the given URL.
virtual ~EcPlugInFactory()
Destructor.
virtual FilePlugIn * CreateFile(const std::string &u)
Create a file plug-in for the given URL.
An interface for file plug-ins.
An interface for file plug-ins.
XRootDStatus Query(QueryCode::Code queryCode, const Buffer &arg, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
bool operator<(const FreeSpace &a) const
std::string address
void Dump() const
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
Path location info.
uint32_t GetSize() const
Get number of locations.
Iterator Begin()
Get the location begin iterator.
Location & At(uint32_t index)
Get the location at index.
Iterator End()
Get the location end iterator.
JobManager * GetJobManager()
Get the job manager object user by the post master.
Handle an async response.
static ResponseHandler * Wrap(std::function< void(XRootDStatus &, AnyObject &)> func)
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
void SelectLocations(XrdCl::LocationInfo &oldList, XrdCl::LocationInfo &newList, uint32_t n)
Object stat info.
URL representation.
Definition XrdClURL.hh:31
const std::string & GetPath() const
Get the path.
Definition XrdClURL.hh:217
static Config & Instance()
Singleton access.
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition XrdOucCRC.cc:190
static void csCalc(const char *data, off_t offs, size_t count, uint32_t *csval)
ReadImpl< false > Read(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating ReadImpl objects.
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errInternal
Internal error.
const uint16_t errInvalidOp
const uint16_t errInvalidArgs
WriteImpl< false > Write(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< const void * > buffer, uint16_t timeout=0)
Factory for creating WriteImpl objects.
const uint16_t errNotSupported
static const int PageSize
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
Flags
Open flags, may be or'd when appropriate.
@ Read
Open only for reading.
@ Write
Open only for writing.
@ Update
Open for reading and writing.
@ OpaqueFile
Implementation dependent.
bool IsOK() const
We're fine.
std::vector< std::string > plgr