Generic Website Data Extraction
Hello All,
From this blog you can gain Knowledge about Generic data extraction from multiple websites and some of the key features that helps to achieve the same
The Language that have used is Python and those have basic understanding about it would find this blog useful.
We might have come across many situation extracting data from a particular website so we would be writing static logic to achieve the data extraction, what if that has to be done on generic pages. Like, you have been provided a scenario to search for a particular pattern or keyword from list of websites which are independent to one another.
Here in this blog as well from the attached video, we have provided a step by step explanation about multiple website data extraction using simple & basic python libraries and there is no usage of any existing frameworks for website scraping like scrapy, spider or other libraries
This involves some steps
Step 1 : Import Dataset
Either we can create our own dataset or we can get dataset from Clients*(if you are asked to do so) or from open sources
## Libraries
import requests
import pandas as pd
import lxml
import tldextract
from bs4 import BeautifulSoup
import sys, os,re
Step 2 :Learn Dataset
- say if we have 1000 websites - we can do manual unit test case in random on 20 websites
like search for the keyword or pattern that most of the website usually follows.
Why we have to do is that helps us in writing a generic logic to achieve fair results
we can create blacklist - to filter out some negative patterns - say if your are finding a keyword 'apple' meaning the apple product not the fruit then the first word that you need to add is 'fruit' so when we come across a page that speaks about apple and if it has the kw fruit then we can ignore that page/website and move on to next rather wasting time on extracting the content from that page.
Similarly creating a Whitelist helps us in confirming about the positivity about the page and we can clearly flag it is as +ve or True or Found whatever the requirement.
This also helps in stopping the extraction further more on the same domain if we have more positive kw at early stage Say you found 1000+ links on a single website, if you try to crawl all links of the same website, then it would end up in a mess, so its better to set up a threshold and step out extracting from the particular domain and continue the next
keywords = {
0:['travel', 'legal','en','policies', 'admin', 'policy','iwindocuments', 'wp', 'manual', 'onboarding', 'doc', 'pdf', 'about'],
1:['travel', 'files', 'policies', 'legal','administr' , 'financial', 'university', 'entertainment', 'policy','doc', 'controller', 'hr', 'uploads', 'pdf', 'responsible', 'corporate', 'administration', 'offices', 'services'],
}
Step 3: Extract content from Website
Extract will be the next step - as we are ready with our preprocess steps, we can start writing logic to extract things from website
1. Collect all links from the websites.
2. Based on the requirement we have to write the regular expression pattern to validate the HTML content
3. we have to parse the html content and check for the validation pattern occurrences in the html content
4. If found - flag it and if not traverse to next link of the particular website
follow our blacklist and whitelist threshold - in order not to waste time on extracting content from same website
## html parser
def parse_html(content):
## This function accepts response.content from the request response and returns the clean text
## from the html content
from bs4.element import Comment
soup = BeautifulSoup(content,'html.parser')
text_elements = soup.findAll(text=True)
tag_blacklist = ['style','script','head','title','meta','[document]','img']
clean_text = []
for element in text_elements:
if element.parent.name in tag_blacklist or isinstance(element,Comment):
continue
else:
text_ = element.strip()
clean_text.append(text_)
result_text = " ".join(clean_text)
result_text = result_text.replace(r'[\r\n]','')
tag_remove_pattern = re.compile(r'<[^>]+>')
result_text = tag_remove_pattern.sub('',result_text)
result_text = re.sub(r'\\','',result_text)
return result_text
def fetch_desired_pat(pure_text):
## This function accepts pure text from the above parse_html function
## and check for the regex pattern from the text and returns the flag +ve, -ve or None
## negative_threshold is set to 15 (int)
detected_flag = ''
re_pattern = re.compile(r"(travel[- _\/].{0,15}?polic(?:y|ies))",re.IGNORECASE)
pat = re_pattern.findall(pure_text)
if pat:
found_at = re.search(re_pattern,pure_text).start()
start =0 if found_at -200 < 0 else found_at -200
end = found_at + 100
context = pure_text[start:end]
neg_words = [word for word in context.split() if word in black_list]
flag = 'Negative'
if len(neg_words) >negative_threshold:
flag = 'Negative'
else:
flag = 'Positive'
detected_flag = flag
return detected_flag
Parse HTML is explained in detail in one our post - Post reference
Step 4: Exporting the obtained data into csv
so we will export it to csv and then we can perform manual validation to cross verify if our algorithm works well or not
Scenario: Travel Policy detection
So here we have a scenario -we have got this project and we have successfully delivered the results & met the requirementsWe are not going to populate or project you the entire dataset as we are not supposed to do, we would be showing few but I can assure you that it would great enough to start off
we are asked to find the website that contains Travel policy - not necessary to find the document or the content just want to confirm that these website are speaking something relevant to travel policy
Each Website would behave differently
-Forbidden (due to region scope)
-Timeout error
-http/https
-Robot detection
-Other reasons
Solution
Exception Handling -HTTPS connectionpool error (Forbidden)
dataset = pd.read_csv('dataset.csv',encoding='utf-8')
bot_header = {'User-agent':'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.45 Safari/537.36'}
url_dataset = {}
for idx,url in enumerate(dataset['URL']):
url_dataset[url] = {'collected_links':[],'detected_urls':[],'overall_status':'','links_visited':0}
depth = 0
negative_threshold = 15
reg_dom = tldextract.extract(url).registered_domain
print(idx,url,reg_dom)
collected_links = url_dataset[url]['collected_links']
links_visited = url_dataset[url]['links_visited']
overall_stat = 'Negative'
url_dataset[url]['overall_status'] = overall_stat
try:
resp = requests.get(url,headers=bot_header,timeout=40)
except:
print('\t Site cant be reached')
continue
soup = BeautifulSoup(resp.content,'lxml')
all_links = soup.find_all('a',href=True)
all_links = list(set([(ele['href'],ele.text) for ele in all_links]))
for aidx,link in enumerate(all_links):
try:
ele = link[0]
e_text = link[1].lower()
proceed_link = False
eligible = []
for kw in keywords[depth]:
if (kw in e_text) or (kw in ele.lower()):
eligible.append(kw)
if len(eligible) >=2:
proceed_link = True
if not proceed_link:
continue
if 'http' not in ele and reg_dom not in ele:
href_correction = ele[1:] if ele.startswith('/') else ele
else:
href_correction = ele
obtained_url = build_url(href_correction,url)
links_visited +=1
response = requests.get(obtained_url,headers=bot_header,timeout=10)
html_content = parse_html(response.content)
detected_flag = fetch_desired_pat(html_content)
if detected_flag =='Positive':
url_dataset[url]['detected_urls'].append(obtained_url)
url_dataset[url]['detected_urls']= list(set(url_dataset[url]['detected_urls']))
collected_links.append((obtained_url,detected_flag))
pos_neg = [cl[1] for cl in collected_links]
if pos_neg.count('Positive') > 10 and pos_neg.count('Positive') > pos_neg.count('Negative'):
overall_stat = 'Positive'
url_dataset[url]['overall_status'] = overall_stat
break
else:
if pos_neg.count('Negative') > 10:
overall_stat = 'Negative'
url_dataset[url]['overall_status'] = overall_stat
break
if overall_stat =='Positive':
url_dataset[url]['overall_status'] = overall_stat
break
if overall_stat =='Negative' and len(collected_links) >50:
pos_neg = [cl[1] for cl in collected_links]
if pos_neg.count('Positive') > pos_neg.count('Negative'):
overall_stat = 'Positive'
url_dataset[url]['overall_status'] = overall_stat
break
url_dataset[url]['overall_status'] = overall_stat
except Exception as e:
print('Overall Exception',str(e))
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
print(exc_type, fname, exc_tb.tb_lineno)
if 'HTTPSConnectionPool' in str(e):
break
if overall_stat =='Positive':
print('\t +ve Kw Found')
url_dataset[url]['collected_links'] = []
continue
else:
url_dataset[url]['collected_links'] = []
print('\t No Positive kw found')
print('##################################################################')
The above script that we have used for this scenario fits to our requirements, If you have queries or concerns on this or at any topic related data extraction you can comment us below and we will get back to you.
Converting dictionary into dataframe is explained here in our post




No comments