XRootD
Loading...
Searching...
No Matches
XrdClAsyncMsgReader.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3// Author: Michal Simon <michal.simon@cern.ch>
4//------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//------------------------------------------------------------------------------
18
19#ifndef SRC_XRDCL_XRDCLASYNCMSGREADER_HH_
20#define SRC_XRDCL_XRDCLASYNCMSGREADER_HH_
21
22#include "XrdCl/XrdClMessage.hh"
25#include "XrdCl/XrdClSocket.hh"
27#include "XrdCl/XrdClStream.hh"
28
29#include <memory>
30
31namespace XrdCl
32{
33 //----------------------------------------------------------------------------
35 //----------------------------------------------------------------------------
37 {
38 public:
39 //------------------------------------------------------------------------
47 //------------------------------------------------------------------------
49 Socket &socket,
50 const std::string &strmname,
51 Stream &strm,
52 uint16_t substrmnb) : readstage( ReadStart ),
53 xrdTransport( xrdTransport ),
54 socket( socket ),
55 strmname( strmname ),
56 strm( strm ),
57 substrmnb( substrmnb ),
58 inmsgsize( 0 ),
59 inhandler( nullptr )
60 {
61 }
62
63 //------------------------------------------------------------------------
65 //------------------------------------------------------------------------
66 virtual ~AsyncMsgReader(){ }
67
68 //------------------------------------------------------------------------
70 //------------------------------------------------------------------------
71 inline void Reset()
72 {
73 readstage = ReadStart;
74 inmsg.reset();
75 inmsgsize = 0;
76 inhandler = nullptr;
77 }
78
79 //------------------------------------------------------------------------
81 //------------------------------------------------------------------------
83 {
84 Log *log = DefaultEnv::GetLog();
85
86 while( true )
87 {
88 switch( readstage )
89 {
90 //------------------------------------------------------------------
91 // There is no incoming message currently being processed so we
92 // create a new one
93 //------------------------------------------------------------------
94 case ReadStart:
95 {
96 inmsg = std::make_shared<Message>();
97 //----------------------------------------------------------------
98 // The next step is to read the header
99 //----------------------------------------------------------------
100 readstage = ReadHeader;
101 continue;
102 }
103 //------------------------------------------------------------------
104 // We need to read the header
105 //------------------------------------------------------------------
106 case ReadHeader:
107 {
108 XRootDStatus st = xrdTransport.GetHeader( *inmsg, &socket );
109 if( !st.IsOK() || st.code == suRetry )
110 return st;
111
112 log->Dump( AsyncSockMsg, "[%s] Received message header for %p size: %d",
113 strmname.c_str(), (void*)inmsg.get(), inmsg->GetCursor() );
114
115 ServerResponse *rsp = (ServerResponse*)inmsg->GetBuffer();
116 if( rsp->hdr.status == kXR_attn )
117 {
118 log->Dump( AsyncSockMsg, "[%s] Will readout the attn action code "
119 "of message %p", strmname.c_str(), (void*)inmsg.get() );
120 inmsg->ReAllocate( 16 ); // header (bytes 8) + action code (8 bytes)
121 readstage = ReadAttn;
122 continue;
123 }
124
125 inmsgsize = inmsg->GetCursor();
126 inhandler = strm.InstallIncHandler( inmsg, substrmnb );
127
128 if( inhandler )
129 {
130 log->Dump( AsyncSockMsg, "[%s] Will use the raw handler to read body "
131 "of message %p", strmname.c_str(), (void*)inmsg.get() );
132 //--------------------------------------------------------------
133 // The next step is to read raw data
134 //--------------------------------------------------------------
135 readstage = ReadRawData;
136 continue;
137 }
138
139 //----------------------------------------------------------------
140 // The next step is to read the message body
141 //----------------------------------------------------------------
142 readstage = ReadMsgBody;
143 continue;
144 }
145 //------------------------------------------------------------------
146 // Before proceeding we need to figure out the attn action code
147 //------------------------------------------------------------------
148 case ReadAttn:
149 {
150 XRootDStatus st = ReadAttnActnum();
151 if( !st.IsOK() || st.code == suRetry )
152 return st;
153
154 //----------------------------------------------------------------
155 // There is an embedded response, overwrite the message with that
156 //----------------------------------------------------------------
157 if( HasEmbeddedRsp() )
158 {
159 inmsg->Free();
160 readstage = ReadHeader;
161 continue;
162 }
163
164 //----------------------------------------------------------------
165 // Readout the rest of the body
166 //----------------------------------------------------------------
167 inmsgsize = inmsg->GetCursor();
168 readstage = ReadMsgBody;
169 continue;
170 }
171 //------------------------------------------------------------------
172 // kXR_status is special as it can have both body and raw data,
173 // handle it separately
174 //------------------------------------------------------------------
175 case ReadMore:
176 {
177 XRootDStatus st = xrdTransport.GetMore( *inmsg, &socket );
178 if( !st.IsOK() || st.code == suRetry )
179 return st;
180 inmsgsize = inmsg->GetCursor();
181
182 //----------------------------------------------------------------
183 // The next step is to finalize the read
184 //----------------------------------------------------------------
185 readstage = ReadDone;
186 continue;
187 }
188 //------------------------------------------------------------------
189 // We need to call a raw message handler to get the data from the
190 // socket
191 //------------------------------------------------------------------
192 case ReadRawData:
193 {
194 uint32_t bytesRead = 0;
195 XRootDStatus st = inhandler->ReadMessageBody( inmsg.get(), &socket, bytesRead );
196 if( !st.IsOK() )
197 return st;
198 inmsgsize += bytesRead;
199 if( st.code == suRetry )
200 return st;
201 //----------------------------------------------------------------
202 // The next step is to finalize the read
203 //----------------------------------------------------------------
204 readstage = ReadDone;
205 continue;
206 }
207 //------------------------------------------------------------------
208 // No raw handler, so we read the message to the buffer
209 //------------------------------------------------------------------
210 case ReadMsgBody:
211 {
212 XRootDStatus st = xrdTransport.GetBody( *inmsg, &socket );
213 if( !st.IsOK() || st.code == suRetry )
214 return st;
215 inmsgsize = inmsg->GetCursor();
216
217
218 //----------------------------------------------------------------
219 // kXR_status response needs special handling as it can have
220 // either (body + raw data) or (body + additional body data)
221 //----------------------------------------------------------------
222 if( IsStatusRsp() )
223 {
224 uint16_t action = strm.InspectStatusRsp( substrmnb,
225 inhandler );
226
227 if( action & MsgHandler::Corrupted )
229
230 if( action & MsgHandler::Raw )
231 {
232 //--------------------------------------------------------------
233 // The next step is to read the raw data
234 //--------------------------------------------------------------
235 readstage = ReadRawData;
236 continue;
237 }
238
239 if( action & MsgHandler::More )
240 {
241
242 //--------------------------------------------------------------
243 // The next step is to read the additional data in the message
244 // body
245 //--------------------------------------------------------------
246 readstage = ReadMore;
247 continue;
248 }
249 }
250
251 //----------------------------------------------------------------
252 // The next step is to finalize the read
253 //----------------------------------------------------------------
254 readstage = ReadDone;
255 continue;
256 }
257
258 case ReadDone:
259 {
260 //----------------------------------------------------------------
261 // Report the incoming message
262 //----------------------------------------------------------------
263 log->Dump( AsyncSockMsg, "[%s] Received message %p of %d bytes",
264 strmname.c_str(), (void*)inmsg.get(), inmsgsize );
265
266 strm.OnIncoming( substrmnb, std::move( inmsg ), inmsgsize );
267 }
268 }
269 // just in case
270 break;
271 }
272
273 //----------------------------------------------------------------------
274 // We are done
275 //----------------------------------------------------------------------
276 return XRootDStatus();
277 }
278
279 private:
280
281 XRootDStatus ReadAttnActnum()
282 {
283 //----------------------------------------------------------------------
284 // Readout the action code from the socket. We are reading out 8 bytes
285 // into the message, the 8 byte header is already there.
286 //----------------------------------------------------------------------
287 size_t btsleft = 8 - ( inmsg->GetCursor() - 8 );
288 while( btsleft > 0 )
289 {
290 int btsrd = 0;
291 XRootDStatus st = socket.Read( inmsg->GetBufferAtCursor(), btsleft, btsrd );
292 if( !st.IsOK() || st.code == suRetry )
293 return st;
294 btsleft -= btsrd;
295 inmsg->AdvanceCursor( btsrd );
296 }
297
298 //----------------------------------------------------------------------
299 // Marshal the action code
300 //----------------------------------------------------------------------
301 ServerResponseBody_Attn *attn = (ServerResponseBody_Attn*)inmsg->GetBuffer( 8 );
302 attn->actnum = ntohl( attn->actnum );
303
304 return XRootDStatus();
305 }
306
307 inline bool IsStatusRsp()
308 {
309 ServerResponseHeader *hdr = (ServerResponseHeader*)inmsg->GetBuffer();
310 return ( hdr->status == kXR_status );
311 }
312
313 inline bool HasEmbeddedRsp()
314 {
315 ServerResponseBody_Attn *attn = (ServerResponseBody_Attn*)inmsg->GetBuffer( 8 );
316 return ( attn->actnum == kXR_asynresp );
317 }
318
319 //------------------------------------------------------------------------
321 //------------------------------------------------------------------------
322 enum Stage
323 {
324 ReadStart, //< the next step is to initialize the read
325 ReadHeader, //< the next step is to read the header
326 ReadAttn, //< the next step is to read attn action code
327 ReadMore, //< the next step is to read more status body
328 ReadMsgBody, //< the next step is to read the body
329 ReadRawData, //< the next step is to read the raw data
330 ReadDone //< the next step is to finalize the read
331 };
332
333 //------------------------------------------------------------------------
334 // Current read stage
335 //------------------------------------------------------------------------
336 Stage readstage;
337
338 //------------------------------------------------------------------------
339 // The context of the read operation
340 //------------------------------------------------------------------------
341 TransportHandler &xrdTransport;
342 Socket &socket;
343 const std::string &strmname;
344 Stream &strm;
345 uint16_t substrmnb;
346
347
348 //------------------------------------------------------------------------
349 // The internal state of the the reader
350 //------------------------------------------------------------------------
351 std::shared_ptr<Message> inmsg; //< the ownership is shared with MsgHandler
352 uint32_t inmsgsize;
353 MsgHandler *inhandler;
354
355 };
356
357} /* namespace XrdCl */
358
359#endif /* SRC_XRDCL_XRDCLASYNCMSGREADER_HH_ */
@ kXR_asynresp
Definition XProtocol.hh:938
@ kXR_status
Definition XProtocol.hh:907
@ kXR_attn
Definition XProtocol.hh:901
ServerResponseHeader hdr
Utility class encapsulating reading response message logic.
void Reset()
Reset the state of the object (makes it ready to read out next msg)
XRootDStatus Read()
Read out the response from the socket.
virtual ~AsyncMsgReader()
Destructor.
AsyncMsgReader(TransportHandler &xrdTransport, Socket &socket, const std::string &strmname, Stream &strm, uint16_t substrmnb)
static Log * GetLog()
Get default log.
Handle diagnostics.
Definition XrdClLog.hh:101
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition XrdClLog.cc:299
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead)
@ More
there are more (non-raw) data to be read
A network socket.
virtual XRootDStatus Read(char *buffer, size_t size, int &bytesRead)
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
Perform the handshake and the authentication for each physical stream.
virtual XRootDStatus GetBody(Message &message, Socket *socket)=0
virtual XRootDStatus GetHeader(Message &message, Socket *socket)=0
virtual XRootDStatus GetMore(Message &message, Socket *socket)=0
const uint16_t suRetry
const uint16_t stError
An error occurred that could potentially be retried.
const uint64_t AsyncSockMsg
const uint16_t errCorruptedHeader
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.