CGEM BOSS 6.6.5.g
BESIII Offline Software System
Loading...
Searching...
No Matches
DimRpcReader.cxx
Go to the documentation of this file.
6#include "dic.hxx"
7#include <iostream>
8
9pthread_mutex_t DimRpcReader::m_rpcLock = PTHREAD_MUTEX_INITIALIZER;
10
12{
13 if ( name.empty() ) {
14 throw RawExMessage("[NetDataReader] The name of DistBoss EvtServer was not set!");
15 }
16
18
19 m_buffer = new AutoEnlargeBuffer(128*1024);
20
21 m_rpc = new DimRpcInfo(name.c_str(), 5, DistBossCode::ServerTimeout);
22}
23
25{
26 delete m_rpc;
27}
28
29const uint32_t* DimRpcReader::nextEvent()
30{
31 static int nn = 0;
32 ++nn;
33
34 int theCode = DistBossCode::GetEvent;
35
36 for ( int i = 1; i < 7; ++i ) {
37 pthread_mutex_lock( &m_rpcLock );
38
39 m_outCode = theCode;
40 m_rpc->setData(m_outCode);
41 int size = m_rpc->getSize();
42 void* data = m_rpc->getData();
43 m_buffer->copy(data, size);
44
45 pthread_mutex_unlock( &m_rpcLock );
46
47 if ( size > 4 ) {
48 return (const uint32_t*)m_buffer->data();
49 }
50 else if ( size == 4 ) {
51 m_inCode = *((const uint32_t*)m_buffer->data());
52 if ( m_inCode == DistBossCode::NoMoreEvents ) {
53 throw RawExMessage("[NetDataReader] Reach the end, no more events left.");
54 }
55 else if ( m_inCode == DistBossCode::ServerTimeout) {
56 if ( i < 6 ) {
57 int sec = i;
58 std::cout << "[NetDataReader] Event " << nn << " timeout. Sleep " << sec << "s before retry." << std::endl;
59 sleep(sec);
60 std::cout << "[NetDataReader] Event " << nn << " now retry time " << i << " ..." << std::endl;
62 continue;
63 }
64 else {
65 throw RawExMessage("[NetDataReader] Failed to retry server. Stop this client!");
66 }
67 }
68 else if ( m_inCode == DistBossCode::ServerError ) {
69 throw RawExMessage("[NetDataReader] DistBossServer ERROR !!!");
70 }
71 else {
72 throw RawExMessage("[NetDataReader] Unknown server code !!!");
73 }
74 }
75 else {
76 throw RawExMessage("[NetDataReader] Invalid data from server !!!");
77 }
78
79 break;
80 }
81
82 return 0;
83}
84
85const uint32_t* DimRpcReader::currentEvent() const
86{
87 return (const uint32_t*)m_buffer->data();
88}
89
91{
92 //FIXME: this is a place holder for runNo()
93 //fill it in the future
94 return 0xFFFFFFFF;
95}
96
98{
99 pthread_mutex_lock( &m_rpcLock );
100
101 m_outCode = DistBossCode::GetFileName;
102 m_rpc->setData(m_outCode);
103 // should check status code here, correct it in future
104 std::string fname((char*)m_rpc->getData());
105
106 pthread_mutex_unlock( &m_rpcLock );
107
108 return fname;
109}
110
112{
113 return 0;
114}
TTree * data
void copy(void *src, int size)
static void registerInstance()
DimRpcReader(ReaderArgType &name)
uint32_t stat()
uint32_t runNo()
const uint32_t * nextEvent()
const uint32_t * currentEvent() const
std::string currentFile()
virtual ~DimRpcReader()
const std::string ReaderArgType
Definition: DimRpcReader.h:15