[프로젝트]
Scrapy 활용 네이버 주식 크롤링하기 + Kafka 서버랑 연동
[ 구축 환경 ]
- 구축 환경 : 윈도우
- 구축 윈도우 : Windows10
- 사용 프로그램 : VSCode, spiders, Kafka 관련, Postman, Hiedisql
- 사용 언어 : Pyhton 3.9, RestAPI, SQL
[ 앞으로 할 작업 ]
- 크롤링 작업 (네이버 증권 사이트에서 Scrapy로 크롤링 작업 준비하기)
- Kafka와 연결 작업 (Scrapy 코드 수정)
- Kafka 서버 구성하기
- DB 구축 및 Kafak connect 만들기
- 연동 결과 확인
1. 크롤링 작업하기 (네이버 주식 사이트에서 주식정보 크롤링하기)
[크롤링 작업 과정 요약]
1. 크롤링 목적 및 대상 확인 : 필요한 정보와 정보를 보유하고 있는 웹사이트 확인
2. Scrapy 프로젝트 생성
- scrapy startproject "project_name"
3. 크롤링 대상에 맞게 코드 수정하기
1) Item 파일 작성 : 수집할 데이터의 구조 정의
- 크롤링 대상 게시물들에 대한 게시물, 저작자, 제목, url 등을 item에 저장
2) Spider 파일 작성 : 데이터 수집을 위한 수행 코드를 정의
- name, start URL 지정(start_requests, start_urls), parse(), selector(xpath, css)
3) Pipelines 파일 작성 : 수집된 데이터 처리 방식 정의(파일저장/DB저장/이메일발송 등)
- item에 저장된 데이터를 기반으로 pipe라인에서 데이터 처리방식 정의(db저장, 특별한 저장 규칙)
4) Settings 파일 작성 : 프로젝트 모듈 간 연결 및 기본 설정 정의
- pipelines의 순서 결정, 로그 파일지정 등
4. 프로젝트 실행 : 작성된 소스코드 컴파일 및 실행
- scrapy crwal "bots"
5. 추출 결과 확인 : 프로젝트 최상위 폴더 내 신규 생성된 데이터 파일 확인
1. 크롤링 대상확인하기
1) 대상 : 네이버 주식 사이트
2) URL 주소 : https://finance.naver.com/sise/lastsearch2.nhn
3) 해당 사이트에서 가져올 정보 : 순위, 종목명, 현재가, 전일비, 등략률, 거래량, 시가, 고가, 저가, PER, ROE
2. Scrapy 프로젝트 생성
scrapy startproject "naverfinance"
3. 크롤링 대상에 맞게 코드 수정하기
1) Item 파일 : 수집할 데이터의 구조 정의
> 변경사항 : scrapy 해올 데이터 항목에 대해 처리
[item.py]
import scrapy
class NaverfinanceItem(scrapy.Item):
rank=scrapy.Field()
name=scrapy.Field()
search_ratio=scrapy.Field()
today_price=scrapy.Field()
diff_price=scrapy.Field()
up_down=scrapy.Field()
volumn=scrapy.Field()
siga=scrapy.Field()
goga=scrapy.Field()
jega=scrapy.Field()
PER=scrapy.Field()
ROE=scrapy.Field()
2) Spider 파일 : 데이터 수집을 위한 수행 코드를 정의
[financebots.py]
import scrapy
from naverfinance.items import NaverfinanceItem
from bs4 import BeautifulSoup
class FinancebotsSpider(scrapy.Spider):
name = 'financebots'
allowed_domains = ['naver.com']
start_urls = ['https://finance.naver.com/sise/lastsearch2.nhn']
def parse(self, response):
soup=BeautifulSoup(response.text,'html.parser')
top30s=soup.select('table.type_5 > tr > td')
finance_top30s2=list()
ini=1
for i in range(30): #top 30
finance_top30s2.append([])
for j in range(12): # 항목 12개
if top30s[ini].text.strip()=='':
ini=ini+3
finance_top30s2[i].append(top30s[ini].text.strip())# top10 마다 생기는 빈공간 제거
ini=ini+1
finance_top30s=list()
for i in range(30):
item=NaverfinanceItem()
for j in range(12):
item['rank']=finance_top30s2[i][0]
item['name']=finance_top30s2[i][1]
item['search_ratio']=finance_top30s2[i][2]
item['today_price']=finance_top30s2[i][3]
if finance_top30s2[i][5][:1] == '-':
item['diff_price']='-'+finance_top30s2[i][4]
elif finance_top30s2[i][5][:1] == '+':
item['diff_price']='+'+finance_top30s2[i][4]
else :
item['diff_price']=finance_top30s2[i][4]
item['up_down']=finance_top30s2[i][5]
item['volumn']=finance_top30s2[i][6]
item['siga']=finance_top30s2[i][7]
item['goga']=finance_top30s2[i][8]
item['jega']=finance_top30s2[i][9]
item['PER']=finance_top30s2[i][10]
item['ROE']=finance_top30s2[i][11]
finance_top30s.append(item)
return finance_top30s
3) settings 파일 : 프로젝트 모듈 간 연결 및 기본 설정 정의
> 크롤링한 결과를 csv파일로 저장하도록 설정
> 이거는 Kafka와 연동하기전 결과 확인
[settings.py]
FEED_FORMAT='csv'
FEED_URI='naverfinance.csv'
FEED_EXPORT_ENCODING='utf-8'
4. 네이버 증권 페이지 크롤링하기
아래 코드는 naverfianace 파일이과 scrapy.cfg가 있는 위치에서 실행해야한다.
scrapy crawl financebots
※ 실행 결과 :
5. csv로 저장된 결과 확인 (naverfinanace.csv)
2. Kafka와 연결작업하기 위해 Scrapy코드 수정하기
1. 크롤링 대상에 맞게 코드 수정하기
1) setting.py 파일 수정 : 프로젝트 모듈 간 연결 및 기본 설정 정의 ( pipelines의 순서 결정, 로그 파일지정 등 )
- 파일 내의 ITEM_PIPELINES 부분 주석 해제하기
[settings.py]
ITEM_PIPELINES = {
'naverfinance.pipelines.NaverfinancePipeline': 300,
}
- 의미 : naverfinanace 프로젝트의 piplines.py 파일 안에 있는 NaverfinanacePipeline 클래스를 0.3초 주기를 주면서 실행하겠다.
2) Pipelines.py 파일 수정 : 수집된 데이터 처리 방식 정의(파일저장/DB저장/이메일발송 등)
- item에 저장된 데이터를 기반으로 pipe라인에서 데이터 처리방식 정의 (db저장, 특별한 저장 규칙)
1. json 모듈과, Kafka 모듈 import하기
(import 할 값들)
from json import dumps
from kafka import KafkaProducer
2. NavfinanacePipeline 내 매서드 수정하기
(1) __init__(self) 생성자 매서드
def __init__(self):
self.producer=KafkaProducer(
acks=0,
compression_type='gzip',
bootstrap_servers=['127.0.0.1:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8')
)
self.topic_prefix="finance_top30"
(2) process_item(self,item,spider) 매서드
def process_item(self, item, spider):
prefix=self.topic_prefix
data=dict()
items=dict(item)
data={
"schema":
{"type":"struct",
"fields":[
{"type":"string","field":"rank"},
{"type":"string","field":"name"},
{"type":"string","field":"search_ratio"},
{"type":"string","field":"today_price"},
{"type":"string","field":"up_down"},
{"type":"string","field":"volumn"},
{"type":"string","field":"siga"},
{"type":"string","field":"jega"},
{"type":"string","field":"goga"},
{"type":"string","field":"diff_price"},
{"type":"string","field":"PER"},
{"type":"string","field":"ROE"}
]},
"payload":items
}
review_topic="{prefix}_result".format(prefix=prefix)
self.producer.send(review_topic,data)
self.producer.flush()
return item
3. Kafka 서버 구성하기
1. Kafka 서버 기동하기
[Zookeeper 서버]
(kafka 폴더에서)
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
[Kafka broker 서버]
(kafka 폴더에서)
.\bin\windows\kafka-server-start.bat .\config\server.properties
[Kafka Connect 서버]
(confluent 폴더에서)
> .\bin\windows\connect-distributed.bat .\etc\kafka\connect-distributed.properties
2. 위에 작성했던 Scrapy코드 실행하기
1) Topic 생성 여부 확인
(kafka 폴더에서)
.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --list
2) Topic에 데이터 들어갔는지 확인하기
- console-consumer로 확인하기
(kafka 폴더에서)
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic finance_top30_result --from-beginning
4. DB구성 및 Kafka Sink Connector 만들기
1. DB 구성하기 (MariaDB 이용)
1) MariaDB에서 데이터베이스 만들기 & 테이블 만들기
CREATE DATABASE finance_db;
USE finance_db;
CREATE TABLE finance_result(
id INT AUTO_INCREMENT primary KEY,
RANK VARCHAR(3) NOT NULL,
NAME VARCHAR(20) NOT NULL,
search_ratio VARCHAR(10) NOT NULL,
today_price VARCHAR(10) NOT NULL,
diff_price VARCHAR(10),
up_down VARCHAR(10),
volumn VARCHAR(10),
siga VARCHAR(10),
jega VARCHAR(10),
goga VARCHAR(10),
PER VARCHAR(10),
ROE VARCHAR(10),
Created_attest_db TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
2. Sink Connector 만들기 (POSTMAN 이용)
1) POSTMAN을 이용해서 Sink Connector 만들기
- Header
- Body
[BODY 내용]
{
"name" : "fianance-sink-connect1",
"config" : {
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://localhost:3306/finance_db",
"connection.user" : "root",
"connection.password" : "test1357",
"auto.create" : "true",
"auto.evolve" : "true",
"delete.enabled" : "false",
"tasks.max" : "1",
"topics" : "finance_top30_result",
"errors.tolerance":"all",
"errors.log.enable":true,
"errors.log.include.messages":true,
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": true
}
}
5. 연동 결과 확인
1. DB에서 결과 확인 (HeidiSQL 활용)
- finance_top30_result 내의 데이터 확인
- 서버 전체 구동 내역