import requests
import json
from datetime import datetime
from bs4 import BeautifulSoup
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
import urllib.parse
def fetch_data(url, data, headers):
try:
response = requests.post(url, data=json.dumps(data), headers=headers)
response.raise_for_status() # Raise an error for bad status codes (4xx or 5xx)
return response.json()
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
return None
def timestamp_to_datetime(timestamp):
# Convert milliseconds timestamp to seconds
timestamp_seconds = timestamp / 1000
# Convert to datetime object
return datetime.fromtimestamp(timestamp_seconds)
def fetch_article_details(article_id, headers):
url = f"http://www.ccgp-guangxi.gov.cn/portal/detail?articleId={article_id}&parentId=66485"
try:
response = requests.get(url, headers=headers)
response.raise_for_status(),
# Raise an error for bad status codes (4xx or 5xx)
return response.json()
except requests.exceptions.RequestException as e:
print(f"Request failed for article {article_id}: {e}")
return None
def process_entry(entry, headers):
publish_date = timestamp_to_datetime(entry.get('publishDate'))
article_id = entry.get('articleId')
encoded_article_id = urllib.parse.quote(article_id, safe='')
url = f"http://www.ccgp-guangxi.gov.cn/site/detail?parentId=66485&articleId={article_id}"
entry['publishDate'] = publish_date.strftime('%Y-%m-%d')
article_details = fetch_article_details(encoded_article_id, headers)
if article_details and article_details.get('success'):
article_data = article_details.get('result', {}).get('data', {})
if article_data:
article_date = article_data.get('content')
html_content = article_date
soup = BeautifulSoup(html_content, 'html.parser')
# 提取报价
price_tag = soup.find("td", class_="code-summaryPrice")
if price_tag is not None:
price_list = price_tag.text.split(":")
if len(price_list) > 1:
price = price_list[1].strip()
else:
price = "未找到价格"
else:
price = "未找到价格标签"
# 提取中标供应商名称
supplier_name_tag = soup.find("td", class_="code-winningSupplierName")
if supplier_name_tag:
supplier_name = supplier_name_tag.text.strip()
else:
supplier_name = ""
# 提取中标供应商地址
supplier_addr_tag = soup.find("td", class_="code-winningSupplierAddr")
if supplier_addr_tag:
supplier_addr = supplier_addr_tag.text.strip()
else:
supplier_addr = ""
# 提取项目联系人
contact_person_tag = soup.find("samp", class_="code-00010")
if contact_person_tag:
contact_person = contact_person_tag.text.strip()
else:
contact_person = ""
# 提取联系电话
contact_phone_tag = soup.find("samp", class_="code-00011")
if contact_phone_tag:
contact_phone = contact_phone_tag.text.strip()
else:
contact_phone = ""
return ({
'地市': entry.get('districtName'),
'公布时间': entry.get('publishDate'),
'项目编号': article_data.get('projectCode'),
'项目名称': article_data.get('projectName'),
'报价': price,
'中标供应商名称': supplier_name,
'中标供应商地址': supplier_addr,
'项目联系人': contact_person,
'联系电话': contact_phone,
'详细URL': url
})
else:
print("No data available for article", article_id)
else:
print("No data available or request unsuccessful for article", article_id)
def main():
url = "http://www.ccgp-guangxi.gov.cn/portal/category"
headers = {
'Accept': 'application/json, text/plain, */*',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Connection': 'keep-alive',
'Content-Type': 'application/json;charset=UTF-8',
'Cookie': '_zcy_log_client_uuid=a9da9e80-e94b-11ee-9fa8-078349691247', # Replace with your actual cookie
'Host': 'www.ccgp-guangxi.gov.cn',
'Origin': 'http://www.ccgp-guangxi.gov.cn',
'Referer': 'http://www.ccgp-guangxi.gov.cn/site/category?parentId=66485&childrenCode=ZcyAnnouncement&utm=luban.luban-PC-38893.959-pc-websitegroup-navBar-front.5.a9db88e0e94b11ee9fa8078349691247',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36',
'X-Requested-With': 'XMLHttpRequest'
}
keyword = input("请输入项目关键词:")
data = {
"pageNo": 1,
"pageSize": 30, # Adjust the page size as needed
"categoryCode": "ZcyAnnouncement4005",
"keyword":keyword
}
entries = []
page_no = 1
max_pages = 3 # Change this to the maximum number of pages you want to crawl
while page_no <= max_pages:
data['pageNo'] = page_no
result = fetch_data(url, data, headers)
if result and result.get('success'):
result_data = result.get('result', {}).get('data', {}).get('data', [])
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(process_entry, entry, headers) for entry in result_data]
for future in futures:
result = future.result()
if result:
entries.append(result)
page_no += 1
else:
print("No data available or request unsuccessful.")
break
if entries:
df = pd.DataFrame(entries)
df.to_excel('中标信息.xlsx', index=False)
print("Excel文件保存成功。")
else:
print("没有数据可保存到 Excel 文件中。")
input("按 Enter 键退出...")
if __name__ == "__main__":
main()