Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
louisaturn committed Jun 23, 2023
2 parents 06829cc + d22e004 commit 3129042
Show file tree
Hide file tree
Showing 22 changed files with 1,095 additions and 155 deletions.
30 changes: 12 additions & 18 deletions crawler_manager/crawler_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,22 @@

from crawling_utils import system_is_deploying

message_sender = None
if not system_is_deploying():
message_sender = MessageSender()


def log_writer_executor():
'''Redirects log_writer output and starts descriptor consumer loop.'''
if not system_is_deploying():
LogWriter.log_consumer()
MESSAGE_SENDER = None
LOG_WRITER = None

if not system_is_deploying():
MESSAGE_SENDER = MessageSender()

def run_spider_manager_listener():
def run_kafka_listeners():
'''Start spider_manager message consumer loop'''
global LOG_WRITER

if not system_is_deploying():
sm_listener = SpiderManagerListener()
sm_listener.run()


LOG_WRITER = LogWriter()
LOG_WRITER.run()

def gen_key():
"""Generates a unique key based on time and a random seed."""
Expand All @@ -42,7 +41,7 @@ def start_crawler(config: dict):
config["crawler_id"] = config["id"]
del config["id"]

message_sender.send_start_crawl(config)
MESSAGE_SENDER.send_start_crawl(config)


def stop_crawler(crawler_id):
Expand All @@ -52,9 +51,4 @@ def stop_crawler(crawler_id):
- crawler_id: Uniquer crawler identifier
"""
message_sender.send_stop_crawl(str(crawler_id))


def update_instances_info(data_path: str, instance_id: str, instance: dict):
"""Updates the file with information about instances when they are created, initialized or terminated."""
pass
MESSAGE_SENDER.send_stop_crawl(str(crawler_id))
59 changes: 43 additions & 16 deletions crawler_manager/log_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

import os
import ujson
from pathlib import Path

import threading
from django.apps import apps
from kafka import KafkaConsumer

from django.db.utils import IntegrityError
from crawler_manager import settings
from crawling_utils import system_is_deploying

class LogWriter():
"""
Expand All @@ -17,9 +17,27 @@ class LogWriter():
consumer_params: get parameters for a KafkaConsumer.
"""
def __init__(self) -> None:
self.log_model = apps.get_model('main', 'Log')
self.__ignore_logs_from_instances = set()

def add_instance_to_ignore(self, instance_id: str):
instance_id = str(instance_id)
self.__ignore_logs_from_instances.add(instance_id)

def remove_instance_to_ignore(self, instance_id: str):
instance_id = str(instance_id)
if instance_id in self.__ignore_logs_from_instances:
self.__ignore_logs_from_instances.remove(instance_id)

def log_consumer(self):
"""
This is a kafka consumer and parser for each message.
"""

# KafkaConsumer parameters dictionary:
DEFAULT_CONSUMER_PARAMS = dict(bootstrap_servers=settings.KAFKA_HOSTS,
consumer = KafkaConsumer(settings.LOGGING_TOPIC,
bootstrap_servers=settings.KAFKA_HOSTS,
auto_offset_reset=settings.KAFKA_CONSUMER_AUTO_OFFSET_RESET,
connections_max_idle_ms=settings.KAFKA_CONNECTIONS_MAX_IDLE_MS,
request_timeout_ms=settings.KAFKA_REQUEST_TIMEOUT_MS,
Expand All @@ -28,16 +46,15 @@ class LogWriter():
enable_auto_commit=settings.KAFKA_CONSUMER_AUTO_COMMIT_ENABLE,
max_partition_fetch_bytes=settings.KAFKA_CONSUMER_FETCH_MESSAGE_MAX_BYTES)

@staticmethod
def log_consumer(params=DEFAULT_CONSUMER_PARAMS):
"""
This is a kafka consumer and parser for each message.
"""
consumer = KafkaConsumer(settings.LOGGING_TOPIC, **params)
execution_context = ''
for message in consumer:
try:
message = ujson.loads(message.value.decode('utf-8'))
execution_context = message['execution_context']

instance_id = message['instance_id']
if instance_id in self.__ignore_logs_from_instances:
continue

log = {}
log['cid'] = message['crawler_id']
Expand All @@ -48,13 +65,23 @@ def log_consumer(params=DEFAULT_CONSUMER_PARAMS):
log['msg'] = message['message']
log['lvl'] = message['levelname']

LogWriter.log_writer(log)

self.log_writer(log)

except IntegrityError:
# the instance that the log is associated was deleted
if execution_context != 'testing':
raise

except Exception as e:
print(f'Error processing message: {e}')

@staticmethod
def log_writer(log):

def run(self):
if not system_is_deploying():
thread = threading.Thread(target=self.log_consumer, daemon=True)
thread.start()

def log_writer(self, log):
"""
This method writes log in database
Expand Down
5 changes: 4 additions & 1 deletion interface/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
SECRET_KEY = get_random_secret_key()

# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = env('DEBUG')
DEBUG = True#env('DEBUG')

ALLOWED_HOSTS = env('DJANGO_ALLOWED_HOSTS')

Expand Down Expand Up @@ -210,3 +210,6 @@

# Folder where collector outputs are stored
OUTPUT_FOLDER = env('OUTPUT_FOLDER')

# tests crawler for up 5 minutes
RUNTIME_OF_CRAWLER_TEST = 120
11 changes: 3 additions & 8 deletions main/apps.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import os
import json
import signal
from threading import Thread

from django.apps import AppConfig
from django.db.utils import OperationalError

from crawler_manager.crawler_manager import log_writer_executor
from step_crawler import functions_file
from step_crawler import parameter_extractor
from crawler_manager.crawler_manager import run_spider_manager_listener
from crawler_manager.crawler_manager import run_kafka_listeners

import json
import os
Expand Down Expand Up @@ -42,12 +40,9 @@ def runOnce(self):
for instance in instances:
instance.running = False
instance.save()

run_kafka_listeners()

# starts kafka log consumer
log_writer_exec_thread = Thread(target=log_writer_executor, daemon=True)
log_writer_exec_thread.start()

run_spider_manager_listener()

def ready(self):
try:
Expand Down
47 changes: 47 additions & 0 deletions main/crawling_timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from datetime import datetime
import threading
from time import sleep

import requests
from rest_framework import status

class CrawlingTimer:
def __init__(self,
crawler_id: str,
test_instance_id: str,
data_path: str,
runtime: float = 300,
server_address: str = 'http://web:8000',
stop_crawler_endpoint: str = '/api/crawlers/{crawler_id}/stop_test') -> None:

self.crawler_id = crawler_id
self.test_instance_id = test_instance_id
self.data_path = data_path

self.runtime = runtime

self.__stop_crawler_url_template = server_address + stop_crawler_endpoint

def _start(self):
print(f'[{datetime.now()}] CrawlingTimer: Waiting {self.crawler_id} crawling for {self.runtime}s using the instance {self.test_instance_id}...')

sleep(self.runtime)

print(f'[{datetime.now()}] CrawlingTimer: Sending stop signal to {self.crawler_id}...')

stop_crawler_url = self.__stop_crawler_url_template.format(crawler_id=self.crawler_id)
response = requests.get(stop_crawler_url)

if response.status_code == status.HTTP_204_NO_CONTENT:
print(f'[{datetime.now()}] CrawlingTimer: Request to stop {self.crawler_id} sent successfully!')

else:
print(f'[{datetime.now()}] CrawlingTimer: Error trying to send stop signal to {self.crawler_id}: [{response.status_code}] {response.text}')

def start(self):
print(f'[{datetime.now()}] CrawlingTimer: Creating thread to test {self.crawler_id} for {self.runtime} seconds!')

thread = threading.Thread(target=self._start, daemon=True)
thread.start()

print(f'[{datetime.now()}] CrawlingTimer: Thread test {self.crawler_id} started!')
9 changes: 9 additions & 0 deletions main/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class Meta:
'video_recording_enabled',

'browser_type',
'browser_user_agent',
'skip_iter_errors',
'browser_resolution_width',
'browser_resolution_height',
Expand Down Expand Up @@ -432,6 +433,14 @@ class RawCrawlRequestForm(CrawlRequestForm):
widget=forms.RadioSelect
)

browser_user_agent = forms.CharField(
label='User agent',
help_text='Use, de preferência, um user-agent que combine com o Navegador Web escolhido, seja o sugerido automaticamente abaixo ou outro de sua preferência.',
initial='Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36',
required=False,
widget=forms.TextInput()
)

skip_iter_errors = forms.BooleanField(
required=False, label="Pular iterações com erro"
)
Expand Down
48 changes: 48 additions & 0 deletions main/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import os
from typing import List, Union

from crawler_manager.constants import *
Expand Down Expand Up @@ -63,6 +64,17 @@ class CrawlRequest(TimeStamped):
'Esse não é um caminho relativo válido.')
data_path = models.CharField(max_length=2000, validators=[pathValid])

FUNCTIONAL_STATUS_CHOICES = [
('testing', 'Testando'),
('testing_by_crawling', 'Testando via coleta'),
('non_functional', 'Não funcional'),
('functional', 'Funcional'),
('not_tested', 'Não testado')
]

functional_status = models.CharField(max_length=32, default='not_tested', choices=FUNCTIONAL_STATUS_CHOICES)
date_last_functional_test = models.DateTimeField(blank=True, null=True)

# SCRAPY CLUSTER ##########################################################

# Don't cleanup redis queues, allows to pause/resume crawls.
Expand Down Expand Up @@ -171,6 +183,7 @@ class CrawlRequest(TimeStamped):
('firefox', 'Mozilla Firefox'),
]
browser_type = models.CharField(max_length=50, choices=BROWSER_TYPE, default='chromium')
browser_user_agent = models.CharField(max_length=500, blank=True, null=True)

# If true, skips failing iterations with a warning, else, stops the crawler
# if an iteration fails
Expand Down Expand Up @@ -346,6 +359,7 @@ def running_instance(self):
return inst_query.get()
return None


@property
def last_instance(self):
last_instance = self.instances.order_by('-creation_date')[:1]
Expand All @@ -354,6 +368,32 @@ def last_instance(self):
except:
return None

def __check_if_crawler_worked(self, instance_id) -> bool:
files_path = f'/data/{self.data_path}/{instance_id}/data/'

raw_pages_crawled = os.listdir(files_path + 'raw_pages/')
files_crawled = os.listdir(files_path + 'files/')

for ignore_file in ('file_description.jsonl', 'temp', 'browser_downloads'):
if ignore_file in raw_pages_crawled:
raw_pages_crawled.remove(ignore_file)

if ignore_file in files_crawled:
files_crawled.remove(ignore_file)

if not raw_pages_crawled and not files_crawled:
return False

return True

def update_functional_status_after_run(self, instance_id):
crawler_worked = self.__check_if_crawler_worked(instance_id)

self.functional_status = 'functional' if crawler_worked else 'non_functional'
self.date_last_functional_test = timezone.now()

self.save()

def __str__(self):
return self.source_name

Expand Down Expand Up @@ -450,7 +490,15 @@ class ResponseHandler(models.Model):
class CrawlerInstance(TimeStamped):
crawler = models.ForeignKey(CrawlRequest, on_delete=models.CASCADE,
related_name='instances')

instance_id = models.BigIntegerField(primary_key=True)

EXECUTION_CONTEXT_CHOICES = [
('crawling', 'Coleta'),
('testing', 'Teste')
]

execution_context = models.CharField(max_length=8, default='crawling', choices=EXECUTION_CONTEXT_CHOICES)

number_files_found = models.PositiveIntegerField(default=0, null=True, blank=True)
number_files_success_download = models.PositiveIntegerField(default=0, null=True, blank=True)
Expand Down
2 changes: 1 addition & 1 deletion main/staticfiles/css/style.css
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ table.dataTable thead .sorting_desc_disabled:before {
background-color: #E8F6EF;
}

#dynamic-processing-item-wrap.disabled, #dynamic-processing-skip-errors.disabled, #dynamic-processing-browser-type.disabled, #dynamic-processing-resolution.disabled, #dynamic-processing-debug-mode.disabled {
#dynamic-processing-item-wrap.disabled, #dynamic-processing-skip-errors.disabled, #dynamic-processing-browser-type.disabled, #dynamic-processing-resolution.disabled, #dynamic-processing-debug-mode.disabled, #dynamic-processing-browser-user-agent.disabled {
opacity: .5;
pointer-events: none;
}
Expand Down
Loading

0 comments on commit 3129042

Please sign in to comment.