libzypp  17.36.3
eventdispatcher_glib.cc
Go to the documentation of this file.
1 #include "timer.h"
3 #include "private/threaddata_p.h"
5 
9 #include <zypp-core/zyppng/base/UnixSignalSource>
10 
11 namespace zyppng {
12 
13 static int inline readMask () {
14  return ( G_IO_IN | G_IO_HUP );
15 }
16 
17 static int inline writeMask () {
18  return ( G_IO_OUT );
19 }
20 
21 static int inline excpMask () {
22  return ( G_IO_PRI );
23 }
24 
25 static int inline evModeToMask ( int mode ) {
26  int cond = 0;
27  if ( mode & AbstractEventSource::Read ) {
28  cond = readMask() | G_IO_ERR;
29  }
30  if ( mode & AbstractEventSource::Write ) {
31  cond = cond | writeMask() | G_IO_ERR;
32  }
33  if ( mode & AbstractEventSource::Exception ) {
34  cond = cond | excpMask() | G_IO_ERR;
35  }
36  return cond;
37 }
38 
39 static int inline gioConditionToEventTypes ( const GIOCondition rEvents, const int requestedEvs ) {
40  int ev = 0;
41  if ( ( rEvents & requestedEvs ) != 0 ) {
42  if ( ( rEvents & readMask() ) && ( requestedEvs & readMask() ) )
44  if ( ( rEvents & writeMask() ) && ( requestedEvs & writeMask() ) )
46  if ( ( rEvents & excpMask()) && ( requestedEvs & excpMask() ) )
48  if ( ( rEvents & G_IO_ERR) && ( requestedEvs & G_IO_ERR ) )
50  }
51  return ev;
52 }
53 
54 static GSourceFuncs abstractEventSourceFuncs = {
58  nullptr,
59  nullptr,
60  nullptr
61 };
62 
64  GAbstractEventSource *src = nullptr;
65  src = reinterpret_cast<GAbstractEventSource *>(g_source_new(&abstractEventSourceFuncs, sizeof(GAbstractEventSource)));
66  (void) new (&src->pollfds) std::vector<GUnixPollFD>();
67 
68  src->eventSource = nullptr;
69  src->_ev = ev;
70  return src;
71 }
72 
74 {
75  for ( GUnixPollFD &fd : src->pollfds ) {
76  if ( fd.tag )
77  g_source_remove_unix_fd( &src->source, fd.tag );
78  }
79 
80  src->pollfds.clear();
81  src->pollfds.std::vector< GUnixPollFD >::~vector();
82  g_source_destroy( &src->source );
83  g_source_unref( &src->source );
84 }
85 
86 gboolean GAbstractEventSource::prepare(GSource *, gint *timeout)
87 {
88  //we can not yet determine if the GSource is ready, polling FDs also have no
89  //timeout, so lets continue
90  if ( timeout )
91  *timeout = -1;
92  return false;
93 }
94 
95 //here we need to figure out which FDs are pending
96 gboolean GAbstractEventSource::check( GSource *source )
97 {
98  GAbstractEventSource *src = reinterpret_cast<GAbstractEventSource*>( source );
99 
100  //check for pending and remove orphaned entries
101  bool hasPending = false;
102 
103  for ( auto fdIt = src->pollfds.begin(); fdIt != src->pollfds.end(); ) {
104  if ( fdIt->tag == nullptr ) {
105  //this pollfd was removed, clear it from the list
106  //for now keep the object in the sources list if the pollfd list gets empty, if it does not register new events until
107  //next check it is removed for good
108  fdIt = src->pollfds.erase( fdIt );
109  } else {
110  GIOCondition pendEvents = g_source_query_unix_fd( source, fdIt->tag );
111  if ( pendEvents & G_IO_NVAL ){
112  //that poll is broken, do we need to do more????
113  fdIt = src->pollfds.erase( fdIt );
114  } else {
115  hasPending = hasPending || ( pendEvents & fdIt->reqEvents );
116  fdIt++;
117  }
118  }
119  }
120 
121  //if the pollfds are empty trigger dispatch so this source can be removed
122  return hasPending || src->pollfds.empty();
123 }
124 
125 //Trigger all event sources that have been activated
126 gboolean GAbstractEventSource::dispatch(GSource *source, GSourceFunc, gpointer)
127 {
128  GAbstractEventSource *src = reinterpret_cast<GAbstractEventSource*>( source );
129 
130  if ( !src )
131  return G_SOURCE_REMOVE;
132 
133  //sources are only removed here so we do not accidentially mess with the pollfd iterator in the next loop
134  //were we trigger all ready FDs
135  if ( src->pollfds.empty() ) {
136  auto it = std::find( src->_ev->_eventSources.begin(), src->_ev->_eventSources.end(), src );
137 
138  if ( it != src->_ev->_eventSources.end() ) {
140  src->_ev->_eventSources.erase( it );
141  return G_SOURCE_REMOVE;
142  }
143  }
144 
145  for ( const GUnixPollFD &pollfd : src->pollfds ) {
146  //do not trigger orphaned ones
147  if ( pollfd.tag != nullptr ) {
148  GIOCondition pendEvents = g_source_query_unix_fd( source, pollfd.tag );
149 
150  if ( (pendEvents & pollfd.reqEvents ) != 0 ) {
151  int ev = gioConditionToEventTypes( pendEvents, pollfd.reqEvents );
152  // we require all event objects to be used in shared_ptr form, by doing this we make sure that the object is not destroyed
153  // while we still use it. However this WILL throw in case of using the EventSource outside of shared_ptr bounds
154  auto eventSourceLocked = src->eventSource->shared_this<AbstractEventSource>();
155  eventSourceLocked->onFdReady( pollfd.pollfd, ev );
156  }
157  }
158  }
159 
160  return G_SOURCE_CONTINUE;
161 }
162 
163 static GSourceFuncs glibTimerSourceFuncs = {
167  nullptr,
168  nullptr,
169  nullptr
170 };
171 
172 //check when this timer expires and set the correct timeout
173 gboolean GLibTimerSource::prepare(GSource *src, gint *timeout)
174 {
175  GLibTimerSource *source = reinterpret_cast<GLibTimerSource *>( src );
176  if ( !source )
177  return false; //not ready for dispatch
178 
179  if ( !source->_t )
180  return false;
181 
182  uint64_t nextTimeout = source->_t->remaining();
183  if ( timeout ) {
184  //this would be a really looong timeout, but be safe
185  if ( nextTimeout > G_MAXINT )
186  *timeout = G_MAXINT;
187  else
188  *timeout = static_cast<gint>( nextTimeout );
189  }
190  return ( nextTimeout == 0 );
191 }
192 
193 //this is essentially the same as prepare
194 gboolean GLibTimerSource::check(GSource *source)
195 {
196  return prepare( source, nullptr );
197 }
198 
199 //emit the expired timers, restart timers that are no single shots
200 gboolean GLibTimerSource::dispatch(GSource *src, GSourceFunc, gpointer)
201 {
202  GLibTimerSource *source = reinterpret_cast<GLibTimerSource *>( src );
203  if ( !source )
204  return true;
205 
206  if ( source->_t == nullptr )
207  return true;
208  //this will emit the expired signal and reset the timer
209  //or stop it in case its a single shot timer
210  source->_t->shared_this<Timer>()->expire();
211  return true;
212 }
213 
215 {
216  GLibTimerSource *src = nullptr;
217  src = reinterpret_cast<GLibTimerSource *>(g_source_new(&glibTimerSourceFuncs, sizeof(GLibTimerSource)));
218  src->_t = nullptr;
219  return src;
220 }
221 
223 {
224  g_source_destroy( &src->source );
225  g_source_unref( &src->source );
226 }
227 
231 static gboolean eventLoopIdleFunc ( gpointer user_data )
232 {
233  auto dPtr = reinterpret_cast<EventDispatcherPrivate *>( user_data );
234  if ( dPtr ) {
235  if( dPtr->runIdleTasks() ) {
236  return G_SOURCE_CONTINUE;
237  }
238 
239  g_source_unref ( dPtr->_idleSource );
240  dPtr->_idleSource = nullptr;
241  }
242  return G_SOURCE_REMOVE;
243 }
244 
246 {
247  source = g_child_watch_source_new( pid );
248 }
249 
251  : tag( other.tag )
252  , source( other.source )
253  , callback( std::move( other.callback ) )
254 {
255  other.source = nullptr;
256 }
257 
259 {
260  if ( source ) {
261  g_source_destroy( source );
262  g_source_unref( source );
263  }
264 }
265 
267 {
268  tag = other.tag;
269  source = other.source;
270  callback = std::move( other.callback );
271  other.source = nullptr;
272  return *this;
273 }
274 
276 {
277  _myThreadId = std::this_thread::get_id();
278 
279  //if we get a context specified ( usually when created for main thread ) we use it
280  //otherwise we create our own
281  if ( ctx ) {
282  _ctx = ctx;
283  g_main_context_ref ( _ctx );
284  } else {
285  _ctx = g_main_context_new();
286  }
287  // Enable this again once we switch to a full async API that requires a eventloop before calling any zypp functions
288  // g_main_context_push_thread_default( _ctx );
289 }
290 
292 {
293  std::for_each ( _runningTimers.begin(), _runningTimers.end(), []( GLibTimerSource *src ){
295  });
296  std::for_each ( _eventSources.begin(), _eventSources.end(), []( GAbstractEventSource *src ){
298  });
299  _runningTimers.clear();
300  _eventSources.clear();
301 
302  if ( _idleSource ) {
303  g_source_destroy( _idleSource );
304  g_source_unref ( _idleSource );
305  }
306 
307  //g_main_context_pop_thread_default( _ctx );
308  g_main_context_unref( _ctx );
309 }
310 
312 {
313  //run all user defined idle functions
314  //if they return true, they are executed again in the next idle run
315  decltype ( _idleFuncs ) runQueue;
316  runQueue.swap( _idleFuncs );
317 
318  while ( runQueue.size() ) {
319  EventDispatcher::IdleFunction fun( std::move( runQueue.front() ) );
320  runQueue.pop();
321  if ( fun() )
322  _idleFuncs.push( std::move(fun) );
323  }
324 
325  //keep this as the last thing to call after all user code was executed
326  if ( _unrefLater.size() )
327  _unrefLater.clear();
328 
329  return _idleFuncs.size() || _unrefLater.size();
330 }
331 
333 {
334  if ( !_idleSource ) {
335  _idleSource = g_idle_source_new ();
336  g_source_set_callback ( _idleSource, eventLoopIdleFunc, this, nullptr );
337  g_source_attach ( _idleSource, _ctx );
338  }
339 }
340 
341 std::shared_ptr<EventDispatcher> EventDispatcherPrivate::create()
342 {
343  return std::shared_ptr<EventDispatcher>( new EventDispatcher() );
344 }
345 
346 void EventDispatcherPrivate::waitPidCallback( GPid pid, gint status, gpointer user_data )
347 {
348  EventDispatcherPrivate *that = reinterpret_cast<EventDispatcherPrivate *>( user_data );
349 
350  try {
351  auto data = std::move( that->_waitPIDs.at(pid) );
352  that->_waitPIDs.erase( pid );
353 
354  if ( data.callback )
355  data.callback( pid, status );
356 
357  g_spawn_close_pid( pid );
358 
359  // no need to take care of releasing the GSource, the event loop took care of that
360 
361  } catch ( const std::out_of_range &e ) {
362  return;
363  }
364 }
365 
367 
369  : Base ( * new EventDispatcherPrivate( reinterpret_cast<GMainContext*>(ctx), *this ) )
370 {
371 }
372 
374 {
375 }
376 
377 void EventDispatcher::updateEventSource( AbstractEventSource &notifier, int fd, int mode )
378 {
379  Z_D();
380  if ( notifier.eventDispatcher().lock().get() != this )
381  ZYPP_THROW( zypp::Exception("Invalid event dispatcher used to update event source") );
382 
383  AbstractEventSource *notifyPtr = &notifier;
384 
385  GAbstractEventSource *evSrc = nullptr;
386  auto &evSrcList = d->_eventSources;
387  auto itToEvSrc = std::find_if( evSrcList.begin(), evSrcList.end(), [ notifyPtr ]( const auto elem ){ return elem->eventSource == notifyPtr; } );
388  if ( itToEvSrc == evSrcList.end() ) {
389 
390  evSrc = GAbstractEventSource::create( d );
391  evSrc->eventSource = notifyPtr;
392  evSrcList.push_back( evSrc );
393 
394  g_source_attach( &evSrc->source, d->_ctx );
395 
396  } else
397  evSrc = (*itToEvSrc);
398 
399  int cond = evModeToMask( mode );
400  auto it = std::find_if( evSrc->pollfds.begin(), evSrc->pollfds.end(), [fd]( const auto &currPollFd ) {
401  return currPollFd.pollfd == fd;
402  });
403 
404  if ( it != evSrc->pollfds.end() ) {
405  //found
406  it->reqEvents = static_cast<GIOCondition>( cond );
407  g_source_modify_unix_fd( &evSrc->source, it->tag, static_cast<GIOCondition>(cond) );
408  } else {
409  evSrc->pollfds.push_back(
410  GUnixPollFD {
411  static_cast<GIOCondition>(cond),
412  fd,
413  g_source_add_unix_fd( &evSrc->source, fd, static_cast<GIOCondition>(cond) )
414  }
415  );
416  }
417 }
418 
420 {
421  Z_D();
422 
423  AbstractEventSource *ptr = &notifier;
424 
425  if ( notifier.eventDispatcher().lock().get() != this )
426  ZYPP_THROW( zypp::Exception("Invalid event dispatcher used to remove event source") );
427 
428  auto &evList = d->_eventSources;
429  auto it = std::find_if( evList.begin(), evList.end(), [ ptr ]( const auto elem ){ return elem->eventSource == ptr; } );
430 
431  if ( it == evList.end() )
432  return;
433 
434  auto &fdList = (*it)->pollfds;
435 
436  if ( fd == -1 ) {
437  //we clear out all unix_fd watches but do not destroy the source just yet. We currently might
438  //be in the dispatch() function of that AbstractEventSource, make sure not to break the iterator
439  //for the fd's
440  for ( auto &pFD : fdList ) {
441  if ( pFD.tag )
442  g_source_remove_unix_fd( &(*it)->source, pFD.tag );
443  pFD.pollfd = -1;
444  pFD.tag = nullptr; //mark as orphaned, do not delete the element here this might break dipatching
445  }
446  } else {
447  auto fdIt = std::find_if( fdList.begin(), fdList.end(), [ fd ]( const auto &pFd ){ return pFd.pollfd == fd; } );
448  if ( fdIt != fdList.end() ) {
449  if ( fdIt->tag )
450  g_source_remove_unix_fd( &(*it)->source, (*fdIt).tag );
451  //also do not remove here, mark as orphaned only to not break iterating in dispatch()
452  fdIt->tag = nullptr;
453  fdIt->pollfd = -1;
454  }
455  }
456 }
457 
459 {
460  Z_D();
461  //make sure timer is not double registered
462  for ( const GLibTimerSource *t : d->_runningTimers ) {
463  if ( t->_t == &timer )
464  return;
465  }
466 
468  newSrc->_t = &timer;
469  d->_runningTimers.push_back( newSrc );
470 
471  g_source_attach( &newSrc->source, d->_ctx );
472 }
473 
475 {
476  Z_D();
477  auto it = std::find_if( d->_runningTimers.begin(), d->_runningTimers.end(), [ &timer ]( const GLibTimerSource *src ){
478  return src->_t == &timer;
479  });
480 
481  if ( it != d->_runningTimers.end() ) {
482  GLibTimerSource *src = *it;
483  d->_runningTimers.erase( it );
485  }
486 }
487 
489 {
490  return d_func()->_ctx;
491 }
492 
493 bool EventDispatcher::waitForFdEvent( const int fd, int events , int &revents , int &timeout )
494 {
495  GPollFD pollFd;
496  pollFd.fd = fd;
497  pollFd.events = evModeToMask(events);
498 
499  bool eventTriggered = false;
500  zypp::AutoDispose<GTimer *> timer( g_timer_new(), &g_timer_destroy );
501  while ( !eventTriggered ) {
502  g_timer_start( *timer );
503  const int res = eintrSafeCall( g_poll, &pollFd, 1, timeout );
504  switch ( res ) {
505  case 0: //timeout
506  timeout = 0;
507  return false;
508  case -1: { // interrupt
509  // if timeout is -1 we wait until eternity
510  if ( timeout == -1 )
511  continue;
512 
513  timeout -= g_timer_elapsed( *timer, nullptr );
514  if ( timeout < 0 ) timeout = 0;
515  if ( timeout <= 0 )
516  return false;
517 
518  ERR << "g_poll error: " << strerror(errno) << std::endl;
519  return false;
520  }
521  case 1:
522  eventTriggered = true;
523  break;
524  }
525  }
526 
527  revents = gioConditionToEventTypes( (GIOCondition)pollFd.revents, evModeToMask(events) );
528  return true;
529 }
530 
531 void EventDispatcher::trackChildProcess( int pid, std::function<void (int, int)> callback )
532 {
533  Z_D();
534  GlibWaitPIDData data ( pid );
535  data.callback = std::move(callback);
536 
537  g_source_set_callback ( data.source, reinterpret_cast<GSourceFunc>(&EventDispatcherPrivate::waitPidCallback), d_ptr.get(), nullptr );
538  data.tag = g_source_attach ( data.source, d->_ctx );
539  d->_waitPIDs.insert( std::make_pair( pid, std::move(data) ) );
540 }
541 
543 {
544  Z_D();
545  try {
546  d->_waitPIDs.erase( pid );
547  } catch ( const std::out_of_range &e ) {
548  return false;
549  }
550  return true;
551 }
552 
554 {
555  Z_D();
556  // lazy init
557  UnixSignalSourceRef r;
558  if ( d->_signalSource.expired ()) {
559  d->_signalSource = r = UnixSignalSource::create();
560  } else {
561  r = d->_signalSource.lock ();
562  }
563  return r;
564 }
565 
567 {
568  return g_main_context_iteration( d_func()->_ctx, false );
569 }
570 
572 {
573  auto d = instance()->d_func();
574  d->_idleFuncs.push( std::move(callback) );
575  d->enableIdleSource();
576 }
577 
578 void EventDispatcher::unrefLaterImpl( std::shared_ptr<void> &&ptr )
579 {
580  Z_D();
581  d->_unrefLater.push_back( std::move(ptr) );
582  d->enableIdleSource();
583 }
584 
586 {
587  d_func()->_unrefLater.clear();
588 }
589 
591 {
592  return d_func()->_runningTimers.size();
593 }
594 
595 std::shared_ptr<EventDispatcher> EventDispatcher::instance()
596 {
597  return ThreadData::current().dispatcher();
598 }
599 
600 void EventDispatcher::setThreadDispatcher(const std::shared_ptr<EventDispatcher> &disp)
601 {
603 }
604 
605 }
DlContextRefType _ctx
Definition: rpmmd.cc:69
virtual void removeTimer(Timer &timer)
std::vector< std::shared_ptr< void > > _unrefLater
std::vector< GAbstractEventSource * > _eventSources
GlibWaitPIDData & operator=(GlibWaitPIDData &&other) noexcept
#define ZYPP_THROW(EXCPT)
Drops a logline and throws the Exception.
Definition: Exception.h:424
std::function< bool()> IdleFunction
static UnixSignalSourceRef create()
static void destruct(GAbstractEventSource *src)
virtual void onFdReady(int fd, int events)=0
static int writeMask()
static std::shared_ptr< EventDispatcher > create()
static void setThreadDispatcher(const std::shared_ptr< EventDispatcher > &disp)
static gboolean check(GSource *source)
std::shared_ptr< EventDispatcher > dispatcher()
Definition: threaddata.cc:57
#define ERR
Definition: Logger.h:102
static GLibTimerSource * create()
#define Z_D()
Definition: zyppglobal.h:105
struct _GPollFD GPollFD
Definition: ZYppImpl.h:26
EventDispatcher::WaitPidCallback callback
static int gioConditionToEventTypes(const GIOCondition rEvents, const int requestedEvs)
static gboolean eventLoopIdleFunc(gpointer user_data)
Called when the event loop is idle, here we run cleanup tasks and call later() callbacks of the user...
void * nativeDispatcherHandle() const
Returns the native dispatcher handle if the used implementation supports it.
static bool waitForFdEvent(const int fd, int events, int &revents, int &timeout)
std::string strerror(int errno_r)
Return string describing the error_r code.
Definition: String.cc:54
static void destruct(GLibTimerSource *src)
EventDispatcherPrivate(GMainContext *ctx, EventDispatcher &p)
void unrefLaterImpl(std::shared_ptr< void > &&ptr)
The Timer class provides repetitive and single-shot timers.
Definition: timer.h:44
static GSourceFuncs glibTimerSourceFuncs
void trackChildProcess(int pid, std::function< void(int, int)> callback)
virtual void updateEventSource(AbstractEventSource &notifier, int fd, int mode)
std::vector< GLibTimerSource * > _runningTimers
static std::shared_ptr< EventDispatcher > instance()
static GSourceFuncs abstractEventSourceFuncs
EventDispatcherPrivate * _ev
static void waitPidCallback(GPid pid, gint status, gpointer user_data)
virtual void registerTimer(Timer &timer)
static gboolean prepare(GSource *, gint *timeout)
static ZYPP_API ThreadData & current()
Definition: threaddata.cc:16
std::vector< GUnixPollFD > pollfds
virtual void removeEventSource(AbstractEventSource &notifier, int fd=-1)
Base class for Exception.
Definition: Exception.h:146
std::queue< EventDispatcher::IdleFunction > _idleFuncs
static int excpMask()
auto eintrSafeCall(Fun &&function, Args &&... args)
static int evModeToMask(int mode)
ZYPP_IMPL_PRIVATE(UnixSignalSource)
Reference counted access to a Tp object calling a custom Dispose function when the last AutoDispose h...
Definition: AutoDispose.h:94
static gboolean dispatch(GSource *source, GSourceFunc, gpointer)
static int readMask()
static gboolean prepare(GSource *src, gint *timeout)
static GAbstractEventSource * create(EventDispatcherPrivate *ev)
std::weak_ptr< EventDispatcher > eventDispatcher() const
std::unique_ptr< BasePrivate > d_ptr
Definition: base.h:174
void invokeOnIdleImpl(IdleFunction &&callback)
void setDispatcher(const std::shared_ptr< EventDispatcher > &disp)
Definition: threaddata.cc:42
UnixSignalSourceRef unixSignalSource()
static gboolean dispatch(GSource *source, GSourceFunc, gpointer)
std::shared_ptr< T > shared_this() const
Definition: base.h:113
static gboolean check(GSource *source)
std::unordered_map< int, GlibWaitPIDData > _waitPIDs