[job] inline 'dispatch' loop

This commit is contained in:
Mike Fährmann
2025-12-05 11:48:51 +01:00
parent 402f53616b
commit d9c1d15aee
2 changed files with 30 additions and 29 deletions

View File

@@ -145,7 +145,6 @@ class Job():
"""Execute or run the job"""
extractor = self.extractor
log = extractor.log
msg = None
self._init()
@@ -156,8 +155,7 @@ class Job():
extractor.sleep(sleep(), "extractor")
try:
for msg in extractor:
self.dispatch(msg)
msg = self.dispatch(extractor)
except exception.StopExtraction as exc:
if exc.depth > 1 and exc.target != extractor.__class__.subcategory:
exc.depth -= 1
@@ -203,31 +201,36 @@ class Job():
self.status |= s
return self.status
def dispatch(self, msg):
def dispatch(self, messages):
"""Call the appropriate message handler"""
if msg[0] == Message.Url:
_, url, kwdict = msg
if self.metadata_url:
kwdict[self.metadata_url] = url
if self.pred_url(url, kwdict):
self.update_kwdict(kwdict)
self.handle_url(url, kwdict)
if FLAGS.FILE is not None:
FLAGS.process("FILE")
msg = None
elif msg[0] == Message.Directory:
self.update_kwdict(msg[1])
self.handle_directory(msg[1])
for msg in messages:
if msg[0] == Message.Url:
_, url, kwdict = msg
if self.metadata_url:
kwdict[self.metadata_url] = url
if self.pred_url(url, kwdict):
self.update_kwdict(kwdict)
self.handle_url(url, kwdict)
if FLAGS.FILE is not None:
FLAGS.process("FILE")
elif msg[0] == Message.Queue:
_, url, kwdict = msg
if self.metadata_url:
kwdict[self.metadata_url] = url
if self.pred_queue(url, kwdict):
self.update_kwdict(kwdict)
self.handle_queue(url, kwdict)
if FLAGS.CHILD is not None:
FLAGS.process("CHILD")
elif msg[0] == Message.Directory:
self.update_kwdict(msg[1])
self.handle_directory(msg[1])
elif msg[0] == Message.Queue:
_, url, kwdict = msg
if self.metadata_url:
kwdict[self.metadata_url] = url
if self.pred_queue(url, kwdict):
self.update_kwdict(kwdict)
self.handle_queue(url, kwdict)
if FLAGS.CHILD is not None:
FLAGS.process("CHILD")
return msg
def handle_url(self, url, kwdict):
"""Handle Message.Url"""
@@ -948,8 +951,7 @@ class DataJob(Job):
# collect data
try:
for msg in extractor:
self.dispatch(msg)
self.dispatch(extractor)
except exception.StopExtraction:
pass
except Exception as exc: