URLLIB使用

urlopen

urlopen构造方法

urlopen(
url,
data=None,
[timeout,]*,
cafile=None,
capath=None,
cadefault=False,
context=None
)

urlopen模拟POST提交表单

import urllib.parse
import urllib.request

url="http://httpbin.org/post"
data=bytes(urllib.parse.urlencode({'word':'hello'}),encoding='utf-8')
response=urllib.request.urlopen(url,data=data,timeout=1)
print(response.read().decode('utf-8'))

Request

urllib.request.Request构造方法

urllib.request.Request(
url,
data=None,
headers={},
origin_req_host=None,
unverifiable=False,
method=None
)

简单操作(修改UA)

import urllib.request

url="http://httpbin.org/post"
headers={
    'User-Agent':'***'
}
request=urllib.request.Request(url=url,headers=headers,method='POST')
response=urllib.request.urlopen(request)
print(response.read().decode('utf-8'))

Request使用代理

from urllib import request,error
if __name__ == '__main__':
    url = "http://httpbin.org"
    proxy = {'http': '218.75.102.198:8000'}
    proxy_handler = request.ProxyHandler(proxy)
    opener = request.build_opener(proxy_handler)
    request.install_opener(opener)
    try:
        rsp = request.urlopen(url)
        print(rsp.status)
    except error.URLError as e:
        print(e)
    except Exception as e:
        print(e)

报错可能是代理的问题

认证登陆

  • 1、HTTPPasswordMgrWithDefaultRealm()实例化一个账号密码管理对象
  • 2、add_password()添加账号密码
  • 3、HTTPBasicAuthHandler()得到Hander
  • Opener.open发起请求...
import urllib.request

url="http://tieba.baidu.com"
user='test_user'
password='test_password'
pwdmgr=urllib.request.HTTPPasswordMgrWithDefaultRealm()
pwdmgr.add_password(None,url,user,password)
auth_handler=urllib.request.HTTPBasicAuthHandler(pwdmgr)
opener=urllib.request.build_opener(auth_handler)
response=opener.open(url)
print(response.read().decode('utf-8'))

好像不大行...

Cookie设置

  • 用CookieJar实例化一个Cookie对象
  • HTTPCookieProcessor构造Handler对象
  • 再opens
import http.cookiejar
import urllib.request

url="http://tieba.baidu.com"
fileName='cookie.txt'

cookie=http.cookiejar.CookieJar()
handler=urllib.request.HTTPCookieProcessor(cookie)
opener=urllib.request.build_opener(handler)
response=opener.open(url)

f=open(fileName,'a')
for item in cookie:
    f.write(item.name+"="+item.value+'\n')
f.close()

请求Cookie并保存文件

HTTPResponse

包含属性:msg,version,status,reason,debuglevel,closed

包含函数:read(),readinto(),getheader(name),getheaders(),fileno()

错误解析

URLError
import urllib.request
import urllib.error

url="http://www.google.com"

try:
    response=urllib.request.urlopen(url)
except urllib.error.URLError as e:
    print('reason: ' + e.reason + '\n')
print(response.read().decode('utf-8'))

不知道为什么返回不了reason。。。

import urllib.request
import urllib.error

url="http://www.google.com"

try:
    response=urllib.request.urlopen(url)
except urllib.error.URLError as e:
#    print('code: ' + e.code + '\n')
    print(e.reason)
 #   print('headers: ' + e.headers + '\n')

这个在断网时成功了...

HTTPError
import urllib.request
import urllib.error

url="http://www.google.com"

try:
    response=urllib.request.urlopen(url)
except urllib.error.HTTPError as e:
    print('code: ' + e.code + '\n')
    print('reason: ' + e.reason + '\n')
    print('headers: ' + e.headers + '\n')
print(response.read().decode('utf-8'))

也是返回不了...

Request库

request.get()

res=requests.get(url,params,**kwargs)
import requests

r=requests.get("http://www.baidu.com")
print(r.status_code)
print(r.encoding)
print(r.apparent_encoding)
print(r.text)

异常处理

import requests

try:
    r=requests.get("http://www.baidu.com",timeout=30)
    r.raise_for_status()
    r.encoding=r.apparent_encoding
    print(r.text)
except:
    print("Wrong!")

request.head()

import requests

r=requests.head("http://www.baidu.com",timeout=30)
print(r.headers)

request.post()

import requests

payload={"key1":"value1","key2":"value2"}
header={"User-Agent":"***"}
r=requests.post("http://httpbin.org/post",data=payload,headers=header)
print(r.text)
r=requests.post("http://httpbin.org/post",data='helloworld')
print(r.text)

其他操作

>w<

request 设置代理

import requests

proxies = {
    "http": "http://218.75.102.198:8000",
    "https": "https://218.75.102.198:8000",
}
response = requests.get("http://www.baidu.com", proxies=proxies,verify=False)
print(response.status_code)

RE正则

re.match:从头开始匹配

re.search:返回第一个成功的匹配

参数:(pattern,string,flags=0),pattern中的r表示特殊字符不转义

group返回每个括号包裹的匹配值

re.sub:字符串替换

re.complie:编译正则表达式,其方法findall可以找出所有的匹配

XPATH

emmm

Try1.爬取阳光电影html

import requests

headers={
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:98.0) Gecko/20100101 Firefox/98.0'
}
url='http://www.ygdy8.com/index.html'
print(headers)
req=requests.get(url,headers=headers)
req.encoding='gbk'
print(req.text)

Try2.百度搜索关键字提交

import requests

try:
    get={'wd':'python'}
    r=requests.get('http://www.baidu.com/s',params=get)
    r.raise_for_status()
    r.encoding=r.apparent_encoding
    print(r.text)
except:
    print('Failed')

AJAX

简介

在不刷新全部页面的情况下通过Ajax实现异步加载数据,实现数据的局部更新

基本原理

1.发送请求

创建XMLHttpRequest

var xmlhttp;
if (window.XMLHttpRequest)
{
    //  IE7+, Firefox, Chrome, Opera, Safari 浏览器执行代码
    xmlhttp=new XMLHttpRequest();
}
else
{
    // IE6, IE5 浏览器执行代码
    xmlhttp=new ActiveXObject("Microsoft.XMLHTTP");
}

一般GET有传输长度限制

而POST更慢

2.解析请求

根据xmlhttp的response来渲染

3.渲染网页

如document.getElementById().innerHTML

Ajax方法分析

Ajax请求类型为xhr

Python模拟请求数据

以https://data.variflight.com/analytics/CodeQuery为例

Network选项卡下找到airportCode,查看Headers下的信息可以发现请求链接为RequestURL,方法为POST

Preview里为返回结果

import requests
import json

def get_data(key,page):
    url='https://data.variflight.com/analytics/Codeapi/airportCode'
    data={
        "key":key,
        "page":page
    }
    res=requests.request("post",url,data=data)
    return res.text

def get_parse(data):#解析并返回JSON字符串
    return json.loads(data)

data=get_data('杭州',0)
apt_info=get_parse(data)
print(apt_info["data"])

关于Python中的JSON模块

试图去爬了微博

import requests
import json

headers = {
    'Host': 'm.weibo.cn',
    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36',
    'X-Requested-With': 'XMLHttpRequest',
}

def get_data():
    data={
        'containerid': '102803',
        'openApp': '0',
        'sinceid': '1'
    }
    url='https://m.weibo.cn/api/container/getIndex'
    res=requests.request(method='GET',url=url,params=data,headers=headers)
    return res.text

def get_parse(data):#解析并返回JSON字符串
    return json.loads(data)

data=get_data()
apt_info=get_parse(data)
for i in range(1,10):
    print(apt_info['data']['cards'][i]['mblog']['text'])

动态渲染页面爬取

selenium

可以获取浏览器当前呈现的网页源代码,做到可见即可爬

selenium定位方法

可以通过id,name,class name,tag name,link text,partial link text,xpath,css selector来定位

selenium控制浏览器操作

设置浏览器大小
from selenium import webdriver

browser = webdriver.Chrome()
browser.get('http://www.baidu.com')

print("480*800")
browser.set_window_size(480,800)
控制浏览器前进后退刷新
from selenium import webdriver
import time

browser = webdriver.Chrome()

browser.set_page_load_timeout(2)# 设置页面加载超时
browser.set_script_timeout(2)# 设置页面异步js执行超时


first_url="http://www.baidu.com"
print("now access %s"%(first_url))
browser.get(first_url)
time.sleep(2)
try:
    second_url="http://news.baidu.com"
    print("now access %s"%(second_url))
    browser.get(second_url)
except:
    print("Too Slow")
time.sleep(2)
print("back to %s"%(first_url))
browser.back()
browser.refresh()# 页面刷新
time.sleep(2)
try:
    print("forward to %s"%(second_url))
    browser.forward()
except:
    print("Too Slow")
一些常用方法

clear清楚文本

send_keys(value)模拟按键输入

click()模拟单击

submit()提交(应用范围没有click那个广泛)

size返回尺寸

text获取元素文本

get_attribute(name):获得属性值

is_displayed():设置该元素是否用户可见

from selenium import webdriver
import time

browser = webdriver.Chrome()

browser.set_page_load_timeout(2)
browser.set_script_timeout(2)

first_url="http://www.baidu.com"
print("now access %s"%(first_url))
browser.get(first_url)
time.sleep(2)

search=browser.find_element_by_id("kw")

search.send_keys("Try")
time.sleep(2)
search.clear()

search.send_keys("Go")
time.sleep(2)

#search.submit()

Click=browser.find_element_by_id("su")

print(Click.get_attribute("value"))

Click.click()
鼠标键盘事件

ActionChains类方法

from selenium import webdriver
from selenium.webdriver.common.action_chains import ActionChains
import time

driver = webdriver.Chrome()
driver.get("http://www.baidu.com")
time.sleep(2)
above=driver.find_element(by="id", value="s-usersetting-top")
ActionChains(driver).move_to_element(above).perform()

#设置鼠标悬浮

好像因为设置在span里。。所以不能直接find_element_by_link_text

from selenium import webdriver
from selenium.webdriver.common.action_chains import ActionChains
import time

driver = webdriver.Chrome()
driver.get("http://www.baidu.com")
time.sleep(2)

news=driver.find_element(by="link text",value="新闻")
action = ActionChains(driver)
action.move_to_element(news)
action.context_click()
action.perform()

send_keys(Keys.BACK_SPACE) 删除键(BackSpace)

send_keys(Keys.SPACE) 空格键(Space)

send_keys(Keys.TAB) 制表键(Tab)

send_keys(Keys.ESCAPE) 回退键(Esc)

send_keys(Keys.ENTER) 回车键(Enter)

send_keys(Keys.CONTROL,'a') 全选(Ctrl+A)

send_keys(Keys.CONTROL,'c') 复制(Ctrl+C)

send_keys(Keys.CONTROL,'x') 剪切(Ctrl+X)

send_keys(Keys.CONTROL,'v') 粘贴(Ctrl+V)

send_keys(Keys.F1) 键盘 F1

send_keys(Keys.F12) 键盘 F12

send_keys(Keys.LEFT)光标左移

send_keys(Keys.SHIFT, Keys.RIGHT)光标选中右移

from selenium import webdriver
from selenium.webdriver.common.keys import Keys
import time

driver = webdriver.Chrome()
driver.get("http://www.baidu.com")
time.sleep(2)

kw=driver.find_element(by="id",value="kw")
kw.send_keys("seleniumm")
kw.send_keys(Keys.BACK_SPACE)
kw.send_keys(Keys.LEFT*3)
time.sleep(2)
kw.send_keys(Keys.SHIFT,Keys.RIGHT*3)
获取断言信息
# coding=utf-8
from selenium import webdriver
import time

driver = webdriver.Chrome()
driver.get("http://bing.com")
print("--------搜索以前--------")

title=driver.title
print(title)

now_url=driver.current_url
print(now_url)

kw=driver.find_element(by="class name",value="sb_form_q")
kw.send_keys("nba")

kw.submit()

time.sleep(1)

print("--------搜索以后--------")
title=driver.title
print(title)

now_url=driver.current_url
print(now_url)

num=driver.find_element(by="id",value="b_tween")
print(num.text)

还不是很会find_element_by_class_name...

设置元素等待
显式等待
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

driver=webdriver.Chrome()
driver.get("https://www.baidu.com")

element=WebDriverWait(driver,5,0.5).until(
    EC.presence_of_element_located((By.ID,"kw"))
)

element.send_keys('selenium')
element.submit()

WebDriverWait(driver,timeout,poll_frequency=0.5,ignored_exceptions=None)

一般与until()和until_not()配合使用(直到返回值为True或False)

隐式等待

不针对页面上的某个元素进行等待

脚本执行到某个元素定位时,可以定位则执行,否则不断判定位

from selenium import webdriver
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.common.by import By
from time import ctime

driver=webdriver.Chrome()

driver.implicitly_wait(10)

driver.get("https://www.baidu.com")

try:
    print(ctime())
    driver.find_element(by=By.ID,value="kw")
except NoSuchElementException as e:
    print(e)
finally:
    print(ctime())
    driver.quit()
多表单切换

留坑。。。还没有弄出来。。。

下拉框选择
from selenium import webdriver
from selenium.webdriver.support.select import Select
from selenium.webdriver.common.by import By
from time import sleep

driver=webdriver.Chrome()
driver.get("https://cn.bing.com/account/general?ru=https%3a%2f%2fcn.bing.com%2f&FORM=O2HV46&sh=0")
sleep(1)

sel=driver.find_element(by=By.XPATH,value="//select[@id='rpp']")
Select(sel).select_by_value("15")

还不是很会弄XPath。。。

调用JS代码
from selenium import webdriver
from time import sleep

driver=webdriver.Chrome()
driver.get("http://www.baidu.com")

driver.set_window_size(500,500)
sleep(3)
js="window.scrollTo(100,450);"
driver.execute_script(js)
sleep(3)

driver.execute_script()

窗口截图

driver.get_screenshot_as_file

无头模式

加入代码:

from selenium.webdriver.chrome.options import Options
chrome_options=Options()
chrome_options.add_argument('--headless')
driver = webdriver.Chrome(options=chrome_options)

不会XPath感觉很糟糕。。。明天学一下

Splash

不知道有什么用。。。

一些脚本

function main(splash, args)
  splash:go("https://www.taobao.com")
  splash:wait(0.5)
  local title=splash:evaljs("document.title")
  return {title=title}
end

关于一些Splash的使用

试图模拟登陆CSDN

Failed。。。

代理的设置与使用

URLLIB设置代理

import urllib.request

url="http://httpbin.org/get"

proxies={
    'http':'http://106.54.128.253:999'
}

proxy_support=urllib.request.ProxyHandler(proxies)
opener=urllib.request.build_opener(proxy_support)
urllib.request.install_opener(opener)
try:
    response=urllib.request.urlopen(url,timeout=1)
    print(response.read().decode('utf-8'))
except urllib.error.URLError as e:
    print(e)

Requests设置代理

import requests

url="http://httpbin.org/get"

proxies={
    'http':'http://106.54.128.253:999'
}

s = requests.session()
s.keep_alive = False
response=requests.get(url=url,proxies=proxies,timeout=1)
print(response.text)

之前实验的时候出现了Max retries exceeded with url错误

解决方法:

1. 增加重试连接次数
requests.adapters.DEFAULT_RETRIES = 5

2. 关闭多余的连接
requests使用了urllib3库,默认的http connection是keep-alive的,requests设置False关闭。
操作方法:
s = requests.session()
s.keep_alive = False

Selenium代理设置

from selenium import webdriver

chromeOptions=webdriver.ChromeOptions()
chromeOptions.add_argument("--proxy-server=http://106.54.128.253:999")
chromeOptions.add_argument("--headless")
browser=webdriver.Chrome(chrome_options=chromeOptions)

browser.get("http://httpbin.org/get")
print(browser.page_source)
browser.quit()

多个IP?

多个IP随机切换的操作:

import random

iplist=['xxx','xxxx']
proxies={'http':random.choice(iplist)}

代理池

构建代理池

import requests
from bs4 import BeautifulSoup

def get_ips():
    url="https://www.kuaidaili.com/free/"
    header={
        'User-Agent':"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.60 Safari/537.36",
    }
    res=requests.get(url,headers=header)
    bs=BeautifulSoup(res.text,'html.parser')
    res_list=bs.find_all('tr');
    ip_list=[]
    for link in res_list:
        tds=link.find_all('td')
        if tds:
            ip_list.append({"ip:":tds[0].text,"port:":tds[1].text})
    return ip_list
ip_list=get_ips()
for item in ip_list:
    print(item)

验证代理池

import socket

import requests
from bs4 import BeautifulSoup

def get_ips():
    url="https://www.kuaidaili.com/free/"
    header={
        'User-Agent':"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.60 Safari/537.36",
    }
    res=requests.get(url,headers=header)
    bs=BeautifulSoup(res.text,'html.parser')
    res_list=bs.find_all('tr');
    ip_list=[]
    for link in res_list:
        tds=link.find_all('td')
        if tds:
            ip_list.append({"ip":tds[0].text,"port":tds[1].text})
    return ip_list

def ip_pool():
    socket.setdefaulttimeout(2)
    ip_list=get_ips()
    for x in ip_list:
        print(x)
    print("="*50)
    ip_pool_list=[]
    id=0;
    print(len(ip_list))
    for x in ip_list:
        id=id+1;
        print(id)
        proxy=x["ip"]+":"+x["port"]
        proxies={'http':proxy}
        try:
            res=requests.get("http://www.baidu.com",proxies=proxies,timeout=2)
            ip_pool_list.append(proxy)
        except Exception as ex:
            continue
    return ip_pool_list

ip=ip_pool()
for x in ip:
    print(x)

使用代理池

import socket
import time
import random

import requests
from bs4 import BeautifulSoup

def get_ips():
    url="https://www.kuaidaili.com/free/"
    header={
        'User-Agent':"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.60 Safari/537.36",
    }
    res=requests.get(url,headers=header)
    bs=BeautifulSoup(res.text,'html.parser')
    res_list=bs.find_all('tr');
    ip_list=[]
    for link in res_list:
        tds=link.find_all('td')
        if tds:
            ip_list.append({"ip":tds[0].text,"port":tds[1].text})
    return ip_list

def ip_pool():
    socket.setdefaulttimeout(2)
    ip_list=get_ips()
    for x in ip_list:
        print(x)
    print("="*50)
    ip_pool_list=[]
    id=0;
    print(len(ip_list))
    for x in ip_list:
        id=id+1;
        print(id)
        proxy=x["ip"]+":"+x["port"]
        proxies={'http':proxy}
        try:
            res=requests.get("http://www.baidu.com",proxies=proxies,timeout=2)
            ip_pool_list.append(proxy)
        except Exception as ex:
            continue
    return ip_pool_list

iplist=ip_pool()
for T in range(1,11):
    proxies={'http':random.choice(iplist)}
    response=requests.get("http://httpbin.org/get",proxies=proxies)
    print(response.text)
    time.sleep(1)

动态拨号

留坑吧。。。

验证码识别

OCR简单识别

import pytesseract
from PIL import Image

print(pytesseract.image_to_string(Image.open("code.jpg"),lang="eng",config="--psm 7"))

一些预处理

二值化变色

def test(path):
    img=Image.open(path)
    w,h=img.size
    for x in range(w):
        for y in range(h):
            r,g,b=img.getpixel((x,y))
            if 0<=r<=100 and 0<=g<=100 and 0<=b<=100:
                img.putpixel((x,y),(255,255,255))
            if 150<=r<=255 and 0<=g<=50 and 0<=b<=50:
                img.putpixel((x,y),(0,0,0))
    img=img.convert('L').point([0]*150+[1]*(256-150),'1')
    return img

降噪

t2val={}
# G:图像二值化阈值
def twoValue(image,G):
    for y in range(0,image.size[1]):
        for x in range(0,image.size[0]):
            g=image.getpixel((x,y))
            if g>G :
                t2val[(x,y)]=1
            else:
                t2val[(x,y)]=0

# N:降噪率 0<N<8
# Z:降噪次数

def clearNoise(image,N,Z):
    for i in range(0,Z):
        t2val[(0,0)]=1
        t2val[(image.size[0]-1,image.size[1]-1)]=1
        for x in range(1,image.size[0]-1):
            for y in range(1,image.size[1]-1):
                nearDots=0
                L=t2val[(x,y)]
                if L==t2val[(x-1,y-1)]:
                    nearDots+=1
                if L==t2val[(x-1,y)]:
                    nearDots+=1
                if L==t2val[(x-1,y+1)]:
                    nearDots+=1
                if L==t2val[(x,y-1)]:
                    nearDots+=1
                if L==t2val[(x,y+1)]:
                    nearDots+=1
                if L==t2val[(x+1,y-1)]:
                    nearDots+=1
                if L==t2val[(x+1,y)]:
                    nearDots+=1
                if L==t2val[(x+1,y+1)]:
                    nearDots+=1

                if nearDots<N:
                    t2val[(x,y)]=1

存图并识别

def saveImage(filename,size):
    image=Image.new("1",size)
    draw=ImageDraw.Draw(image)
    for x in range(0,size[0]):
        for y in range(0,size[1]):
            draw.point((x,y),t2val[(x,y)])
    image.save(filename)
def recognize_captcha(img_path):
    im=Image.open(img_path)
    num=pytesseract.image_to_string(im)
    return num

完整代码

import pytesseract
from PIL import Image,ImageDraw

def test(path):
    img=Image.open(path)
    w,h=img.size
    for x in range(w):
        for y in range(h):
            r,g,b=img.getpixel((x,y))
            if 0<=r<=100 and 0<=g<=100 and 0<=b<=100:
                img.putpixel((x,y),(255,255,255))
            if 150<=r<=255 and 0<=g<=50 and 0<=b<=50:
                img.putpixel((x,y),(0,0,0))
    img=img.convert('L').point([0]*150+[1]*(256-150),'1')
    return img

t2val={}
# G:图像二值化阈值
def twoValue(image,G):
    for y in range(0,image.size[1]):
        for x in range(0,image.size[0]):
            g=image.getpixel((x,y))
            if g>G :
                t2val[(x,y)]=1
            else:
                t2val[(x,y)]=0

# N:降噪率 0<N<8
# Z:降噪次数

def clearNoise(image,N,Z):
    for i in range(0,Z):
        t2val[(0,0)]=1
        t2val[(image.size[0]-1,image.size[1]-1)]=1
        for x in range(1,image.size[0]-1):
            for y in range(1,image.size[1]-1):
                nearDots=0
                L=t2val[(x,y)]
                if L==t2val[(x-1,y-1)]:
                    nearDots+=1
                if L==t2val[(x-1,y)]:
                    nearDots+=1
                if L==t2val[(x-1,y+1)]:
                    nearDots+=1
                if L==t2val[(x,y-1)]:
                    nearDots+=1
                if L==t2val[(x,y+1)]:
                    nearDots+=1
                if L==t2val[(x+1,y-1)]:
                    nearDots+=1
                if L==t2val[(x+1,y)]:
                    nearDots+=1
                if L==t2val[(x+1,y+1)]:
                    nearDots+=1

                if nearDots<N:
                    t2val[(x,y)]=1

def saveImage(filename,size):
    image=Image.new("1",size)
    draw=ImageDraw.Draw(image)
    for x in range(0,size[0]):
        for y in range(0,size[1]):
            draw.point((x,y),t2val[(x,y)])
    image.save(filename)
def recognize_captcha(img_path):
    im=Image.open(img_path)
    num=pytesseract.image_to_string(im)
    return num

if __name__=='__main__':
    path="code.jpg"
    im=test(path)
    path=path.replace('jpg','png')
    im.save(path)
    image=Image.open(path).convert("L")
    twoValue(image,100)
    clearNoise(image,2,5)
    img_path="code.jpeg"
    saveImage(img_path,image.size)
    print (recognize_captcha(img_path))
    print(pytesseract.image_to_string(Image.open("code.jpeg"),lang="eng",config="--psm 7"))

我觉得处理得还行。。。但就是识别不出

GG

CNN验证码识别

一个有趣的项目

整了好久好久的tensorflow环境。。。

至少了解了一下Conda

教程

滑动验证码

import random

from selenium import webdriver
import time
from selenium.webdriver.common.action_chains import ActionChains

driver=webdriver.Chrome()
url="http://127.0.0.1:8890/test.html"
driver.get(url)
action=ActionChains(driver)
time.sleep(1)
element=driver.find_element(by="id",value="nc_1_n1z")
action.click_and_hold(element).perform()
distance=260
while distance > 0:
    if distance>10:
        span=random.randint(5,8)
    else:
        span=random.randint(2,3)
    action.move_by_offset(span,0)
    distance-=span
    time.sleep(random.randint(10,50)/1000)
action.move_by_offset(distance,1).perform()
action.release().perform()

但还是被阿里云的反爬机制给检测出来了。。

拼图验证码

思路

拼图验证码的本质是三张图片叠在一起

1、保存三张图片

2、将两张残缺的图片拼接计算距离

3、实现模拟拖动到指定位置

不大会整。。

APP数据抓取

Fiddler Everywhere

手机抓包的暂时弄不了。。。校园网只能登一个账号。。。

数据储存

TEXT文件存储

file.open('filename','a',encoding='utf-8') # a表示追加
file.write('')
file.close()
with open('filename','a',encoding='utf-8')as file:
    file.write('')

JSON文件储存

JSON是一种轻量级的数据交换格式,完全独立于语言

JSON在Python中分别由LIST和DICT组成。

JSON模块提供了四个功能:

1.dumps:数据转为字符串

2.dump:数据转为字符串并存在文件中

3.loads:把字符串转换为数据类型

4.load:打开文件,并把字符串转换为数据类型

字典->字符串->字典

import json

test_dict={'bigberg':[7600,{1:[['iPhone',6300],['Bike',800],['shirt',300]]}]}
print(test_dict)
print(type(test_dict))
print("="*50)
json_str=json.dumps(test_dict)
print(json_str)
print(type(json_str))
print("="*50)
new_dict=json.loads(json_str)
print(new_dict)
print(type(new_dict))

数据写入JSON文件

import json

new_dict={"name":"zk","age":20,"gender":"m"}
with open("record.json","w",encoding='utf-8')as f:
    json.dump(new_dict,f)
    print("Finished!")

CSV文件储存

CSV用于储存表格数据,每条记录中值与值之间逗号分隔

CSV写数据

import csv

c=open("test.csv","w")
writer=csv.writer(c)
writer.writerow(['name','address','city','state'])

CSV读数据

import csv

c =open("test.csv","r")
reader =csv.reader(c)
for line in reader:
    print(line[0],line[1],line[2],line[3])
c.close()

Excel文件储存

写文件

import xlwt

def set_style(name,height,bold=False):
    style =xlwt.XFStyle()
    font =xlwt.Font()
    font.name=name
    font.bold=bold
    font.colour_index=4
    font.height=height
    style.font=font
    return style

def write_excel(path):
    workbook=xlwt.Workbook(encoding="utf-8") #实例化Workbook
    data_sheet=workbook.add_sheet("demo") #创建sheet
    row0=[u'字段名称',u'大致时段','CRNTI','CELL-ID']
    row1=[u'测试','15:50:33-15:52:14',22706,4190202]
    for i in range(len(row0)):
        data_sheet.write(0, i, row0[i], set_style('Times New Roman', 220, True))
        data_sheet.write(1, i, row1[i], set_style('Times New Roman', 220, True))
        # data_sheet.write(行,列,value)
    workbook.save(path) #表示保存文件

if __name__=='__main__':
    path='demo.xls'
    write_excel(path)
    print("Success!")

读文件

import xlrd

Workbook =xlrd.open_workbook('demo.xls')

sheet_names =Workbook.sheet_names()
sheet1 =Workbook.sheet_by_name('demo')
#sheet1 =Workbook.sheet_by_index(0)
rows=sheet1.row_values(1)
cols10=sheet1.col_values(1)
print('rows',rows)
print('cols10',cols10)

Mysql存储

创建数据库连接

import mysql.connector

mydb=mysql.connector.connect(
    host="localhost",
    user="root",
    passwd="root"
)
print(mydb)

插入数据

import mysql.connector

mydb=mysql.connector.connect(
    host="localhost",
    user="root",
    passwd="root"
)
mycursor=mydb.cursor()

sql="USE pylogin"
mycursor.execute(sql)
sql="create "


sql="INSERT INTO test_01 (id,name,age) VALUES (%s,%s,%s)"
val=[
    (2,"张三",12),
    (3,"李四",13),
    (4,"王五",23),
    (5,"麻子",35)
]
for x in val:
    mycursor.execute(sql,x)
mydb.commit()

查询数据

import mysql.connector

mydb=mysql.connector.connect(
    host="localhost",
    user="root",
    passwd="root"
)
mycursor=mydb.cursor()

sql="USE pylogin"
mycursor.execute(sql)


sql="SELECT * FROM test_01;"
mycursor.execute(sql)
myresult=mycursor.fetchall()

for x in myresult:
    print(x)

MongoDB储存数据

据说可以保证服务器性能

安装mongodb

插入一条数据

import pymongo

myclient =pymongo.MongoClient("mongodb://root:root@localhost:27017")
mydb =myclient["test_db"]
mycol =mydb["sites"]

mydict=[
{"name":"张三","age":"23","gender":"男"},
{"name":"李四","age":"24","gender":"男"},
{"name":"王五","age":"25","gender":"男"},
{"name":"麻子","age":"26","gender":"男"}
]

x=mycol.insert_many(mydict)
print(x.inserted_ids)

插入多条数据

import pymongo

myclient =pymongo.MongoClient("mongodb://root:root@localhost:27017")
mydb =myclient["test_db"]
mycol =mydb["sites"]

mydict=[
    {"name":"张三","age":"23","gender":"男"},
    {"name":"李四","age":"24","gender":"男"},
    {"name":"王五","age":"25","gender":"男"},
    {"name":"麻子","age":"26","gender":"男"}
]

x=mycol.insert_many(mydict)
print(x.inserted_ids)

查询一条数据

import pymongo

myclient =pymongo.MongoClient("mongodb://root:root@localhost:27017")
mydb =myclient["test_db"]
mycol =mydb["sites"]

x=mycol.find_one()

print(x)

查询多条数据

import pymongo

myclient =pymongo.MongoClient("mongodb://root:root@localhost:27017")
mydb =myclient["test_db"]
mycol =mydb["sites"]

for x in mycol.find():
    print(x)

修改数据

import pymongo

myclient =pymongo.MongoClient("mongodb://root:root@localhost:27017")
mydb =myclient["test_db"]
mycol =mydb["sites"]

myquery={"name":"张三"}
newvalues={"$set":{"age":"20"}}

mycol.update_many(myquery,newvalues)

for x in mycol.find():
    print(x)

Redis储存

Redis是一种数据结构服务器,效率极高

安装redis

启动redis

Redis列表

import redis

r = redis.Redis(host='localhost', port=6379, db=0)
r.lpush("test_list",1)
r.lpush("test_list",2)
r.lpush("test_list",3)

print(r.lpop("test_list"))
print(r.lpop("test_list"))

Redis集合

Hash实现,数据不能重复出现,复杂度O(1)

import redis

r = redis.Redis(host='localhost', port=6379, db=0)
r.sadd("test_list",1)
r.sadd("test_list",2)
r.sadd("test_list",3)

print(r.scard("test_list"))

PostgreSQL

可实现PythonDBAPI2.0规范和线程安全

连接数据库

import psycopg2

conn=psycopg2.connect(
    database="postgres",
    user="postgres",
    password="root",
    host="localhost",
    port=5432
)

cur=conn.cursor()

插入数据

import psycopg2

conn=psycopg2.connect(
    database="postgres",
    user="postgres",
    password="root",
    host="localhost",
    port=5432
)

cur=conn.cursor()

cur.execute("CREATE TABLE student(id integer,name varchar,sex varchar);")

cur.execute("INSERT INTO student(id,name,sex)VALUES(%s,%s,%s)",(1,'Aspirin','M'))
cur.execute("INSERT INTO student(id,name,sex)VALUES(%s,%s,%s)",(2,'Taxol','F'))
cur.execute("INSERT INTO student(id,name,sex)VALUES(%s,%s,%s)",(3,'Dixheral','M'))

conn.commit()
cur.close()
conn.close()

查询数据

import psycopg2

conn=psycopg2.connect(
    database="postgres",
    user="postgres",
    password="root",
    host="localhost",
    port=5432
)

cur=conn.cursor()

cur.execute("SELECT * FROM student")

result=cur.fetchall()
print(result)

cur.close()
conn.close()

修改删除

和插入比就是语句不同

Python 实现爬取简书上的文章标题

from selenium import webdriver
import time
import pymysql
from lxml import etree

driver=webdriver.Chrome()
driver.get('https://www.jianshu.com')

def load_more(num):
    for x in range(5):
        js="var q=document.documentElement.scrollTop=100000"
        driver.execute_script(js)
        time.sleep(2)
    if num==0:
        time.sleep(4)
    load_more=driver.find_element_by_class_name("load-more")
    load_more.click()

def get_html():
    note_list=driver.find_element_by_class_name("note-list")
    html=note_list.get_attribute("innerHTML")
    return html

def extract_data(content_html):
    html=etree.HTML(content_html)
    title_list=html.xpath('//li//a[@class="title"]/text()')
    abstract_list=html.xpath('//li//a[@class="abstract"]/text()')
    nickname_list=html.xpath('//li//a[@class="nickname"]/text()')
    data_list=[]
    for index,x in enumerate(title_list):
        item={}
        item["title"]=title_list[index]
#        item["abstract"]=abstract_list[index]
 #       item["nickname"]=nickname_list[index]
        print(item)
        data_list.append(item)
    return data_list

def insert_data(sql):
    db=pymysql.connect(
        host="127.0.0.1",
        user="root",password="root",
        database="pylogin",
        charset="utf8")
    try:
        cursor=db.cursor()
        return cursor.execute(sql)
    except Exception as ex:
        print(ex)
    finally:
        db.commit()
        db.close()

for x in range(2):
    print(x)
    load_more(x)
    time.sleep(1)

results=extract_data(get_html())
for item in results:
    print(item)
    sql="insert into jianshu(title) values('%s')"\
        ""%(item["title"])
    insert_data(sql)

很多地方还不是很懂...主要是XPath这块

Python实现微服务打卡

import re

import requests
from bs4 import BeautifulSoup
from requests_toolbelt.utils import dump
import api
import Student
import json

sess = requests.session()

def download_img(imgurl):
    rsp = sess.get(imgurl)
    if rsp.status_code == 200:
        content = rsp.content
        with open("./captcha.png", "wb+") as f:
            f.write(content)
        return str(content)


def GetExecution(be):
    return be.find(name="input", attrs={"name": "execution"})["value"]

def GETCaptcha(be):
    pattern = re.compile(r"id: '\d+'$",re.MULTILINE)
    ID0 = be.find_all("script",text=pattern)
    ID1 = pattern.findall(str(ID0[0]))[0]
    ID2 = re.findall(r"\d+",ID1)[0]
    CaptchaURL = "http://ua.scu.edu.cn/captcha?captchaId="+ID2
    download_img(CaptchaURL)
    Captcha=api.Get("./captcha.png")
    return Captcha

sess_header = {
    'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36',
}
def Visit():
    LoginUrl = "http://ua.scu.edu.cn/login?service=https%3A%2F%2Fwfw.scu.edu.cn%2Fa_scu%2Fapi%2Fsso%2Fcas-index%3Fredirect%3Dhttps%253A%252F%252Fwfw.scu.edu.cn%252Fncov%252Fwap%252Fdefault%252Fsave"
    res = sess.get(LoginUrl, headers=sess_header)
    return BeautifulSoup(res.text, 'html.parser')

def SetPost(Execution,Captcha):
    POSTDATA = {
        'username': Student.username,
        'password': Student.password,
        'captcha': Captcha,
        'submit': '%E7%99%BB%E5%BD%95',
        'type': 'username_password',
        'execution': Execution,
        '_eventId': 'submit'
    }
    return POSTDATA

def Rush(data):
    URL = "https://ua.scu.edu.cn/login"
    res = sess.post(URL,headers=sess_header,data=data)
    return res.status_code
    # ddd = dump.dump_all(res)
    # print(ddd.decode("utf-8"))

def Daka(data):
    # print(data)

    url = "https://wfw.scu.edu.cn/ncov/wap/default/save"
    res=sess.post(url,data=data)
    mess=json.loads(res.text)
    print(mess["m"])
    # print(sess.cookies)

    # ddd = dump.dump_all(res)
    # print(ddd.decode("utf-8"))

def HealthData():
    data = {
        "zgfxdq": "0",
        "mjry": "0",
        "csmjry": "0",
        "szxqmc": "江安校区",
        "sfjzxgym": "0",
        "jzxgymrq": "",
        "sfjzdezxgym": "0",
        "jzdezxgymrq": "",
        "sfjzdszxgym": "0",
        "jzdszxgymrq": "",
        "uid": "",
        "date": "",
        "tw": "2",
        "sfcxtz": "0",
        "sfyyjc": "0",
        "jcjgqr": "0",
        "jcjg": "",
        "sfjcbh": "0",
        "sfcxzysx": "0",
        "qksm": "",
        "remark": "",
        "address": "四川省成都市双流区西航港街道四川大学江安校区",
        "area": "四川省+成都市+双流区",
        "province": "四川省",
        "city": "成都市",
        "geo_api_info": {
            "type": "complete",
            "position": {"Q": 30.55300374349, "R": 103.99391167534799, "lng": 103.993912, "lat": 30.553004},
            "location_type": "html5",
            "message": "Get+ipLocation+failed.Get+geolocation+success.Convert+Success.Get+address+success.",
            "accuracy": 35,
            "isConverted": "true",
            "status": 1,
            "addressComponent": {
                "citycode": "028",
                "adcode": "510116",
                "businessAreas": [{
                    "name": "白家",
                    "id": "510116",
                    "location": {
                        "Q": 30.562482,
                        "R": 104.006821,
                        "lng": 104.006821,
                        "lat": 30.562482
                    }
                }],
                "neighborhoodType": "",
                "neighborhood": "",
                "building": "",
                "buildingType": "",
                "street": "川大路二段",
                "streetNumber": "1号",
                "country": "中国",
                "province": "四川省",
                "city": "成都市",
                "district": "双流区",
                "towncode": "510116002000",
                "township": "西航港街道"
            },
            "formattedAddress": "四川省成都市双流区西航港街道四川大学江安校区学生西园8舍围合",
            "roads": [],
            "crosses": [],
            "pois": [],
            "info": "SUCCESS"
        },
        "created": "1649433631",
        "sfzx": "1",
        "sfjcwhry": "0",
        "sfcyglq": "0",
        "gllx": "",
        "glksrq": "",
        "jcbhlx": "",
        "jcbhrq": "",
        "sftjwh": "0",
        "sftjhb": "0",
        "fxyy": "",
        "bztcyy": "1",
        "fjsj": "0",
        "sfjchbry": "0",
        "sfjcqz": "",
        "jcqzrq": "",
        "jcwhryfs": "",
        "jchbryfs": "",
        "xjzd": "",
        "szgj": "",
        "sfsfbh": "0",
        "szsqsfybl": "0",
        "sfsqhzjkk": "0",
        "sqhzjkkys": "",
        "sfygtjzzfj": "0",
        "gtjzzfjsj": "",
        "szcs": "",
        "sfjxhsjc": "0",
        "hsjcrq": "",
        "hsjcdd": "",
        "hsjcjg": "0",
        "bzxyy": "",
        "id": "",
        "gwszdd": "",
        "sfyqjzgc": "",
        "jrsfqzys": "",
        "jrsfqzfy": "",
        "szgjcs": "",
        "ismoved": "0"
    }
    return data

def Login():
    count=0
    while count<=5:
        be=Visit()
        Execution=GetExecution(be)
        Captcha=GETCaptcha(be)
        data=SetPost(Execution,Captcha)
        status=Rush(data)
        if status==200:
            return "Success!";
        count+=1
    return "Failed!"

def run():
    print("登录微服务中............")
    if Login() == "Failed!":
        print("Username Or Password Wrong!")
        exit()
    print("登陆成功!正在配置打卡数据................")
    data = HealthData()
    print("配置成功!正在打卡................")
    Daka(data)

if __name__=="__main__":
    run()

常用爬虫框架

PySpider

架构

分为:Scheduler(调度器),Fetcher(抓取器),Processor(处理器)

整个爬虫受到Monitor的监控,抓取的结果由Result Worker处理

M1Mac上好像不大能整。。。

Scrapy

创建项目

scrapy startproject FirstLab

scrapy.cfg:项目的配置文件

FirstLab/:该项目的Python模块

FirstLab/items.py:项目中的item文件

FirstLab/pipelines.py:项目中的pipelines文件

FirstLab/settings.py:项目的设置文件

FirstLab/spiders:放置spider代码的目录

定义Item

Item保存爬取到的数据的容器

# Define here the models for your scraped items
#
# See documentation in:
# https://docs.scrapy.org/en/latest/topics/items.html

import scrapy


class FirstlabItem(scrapy.Item):
    # define the fields for your item here like:
    # name = scrapy.Field()
    title = scrapy.Field()
    link = scrapy.Field()
    desc = scrapy.Field()
    pass

编写爬虫

创建spider类必须继承scrapy.Spider类,并定义以下三个属性:

1、name:用于区分Spider,名称必须唯一

2、start_urls:包含了Spider

3、parse():Spider的一个方法,负责解析返回的数据,以及生成需要进一步处理的URL的request对象

import scrapy

class DmozSpider(scrapy.Spider):
    name="dmoz"
    allowed_domains=["runoob.com"]
    start_urls=[
        "http://www.runoob.com/xpath/xpath-examples.html",
        "http://www.runoob.com/bootstrap/bootstrap-tutorial.html"
    ]

    def parse(self,response):
        filename=response.url.split("/")[-2]
        with open(filename,'wb') as f:
            f.write(response.body)

提取Item

用ScrapySelectors

在Shell中尝试Selector选择器

scrapy shell "http://www.runoob.com/xpath/xpath-examples.html"

shell载入后可以得到一个包含response数据的本地response变量

提取数据

import scrapy

class DmozSpider(scrapy.Spider):
    name="dmoz"
    allowed_domains=["runoob.com"]
    start_urls=[
        "http://www.runoob.com/xpath/xpath-examples.html",
        "http://www.runoob.com/bootstrap/bootstrap-tutorial.html"
    ]

    def parse(self,response):
        for x in response.selector.xpath('//ul/li'):
            print(x.xpath('a/text()').extract())
            print(x.xpath('a/@href').extract())


告别纷扰,去寻找生活的宝藏。