Project
모바일 로봇 프로젝트 1 - YOLO
- -
먼저 모바일 로봇 (또는 모바일 로봇 위에 올라갈 라즈베리파이)의 카메라에서 실시간 영상을 서버에서 받고 처리하기 위해
아래와 같은 과정을 기획해 보았다.
로봇은 다른 복잡한 기능 없이 오직 실시간 영상만 보내면 되니깐 flask를 이용하자
아래는 로봇의 코드이다.
sender.py
import cv2
from flask import Flask, Response
app = Flask(__name__)
# OpenCV로 웹캠 캡처 객체 생성 (0은 기본 웹캠을 의미)
camera = cv2.VideoCapture(0) # 0 2 4
def generate_frames():
"""웹캠에서 프레임을 가져와 스트리밍"""
while True:
success, frame = camera.read() # 웹캠에서 프레임 읽기
if not success:
break
else:
frame = cv2.resize(frame, (0, 0), fx=0.5, fy=0.5)
# 프레임을 JPEG로 인코딩
ret, buffer = cv2.imencode('.jpg', frame)
frame = buffer.tobytes()
# 프레임 데이터를 전송
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
@app.route('/video_feed')
def video_feed():
"""비디오 스트리밍"""
return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')
if __name__ == "__main__":
# Flask 앱 실행 (호스트를 0.0.0.0으로 설정하여 외부에서 접근 가능)
app.run(host='0.0.0.0', port=5000, threaded=True)
다음은 내 컴퓨터(서버)의 코드이다.
receiver.py
import cv2
import requests
import numpy as np
# 라즈베리파이 서버의 스트리밍 URL
url = "http://xxx.xxx.0.xx:5000/video_feed" # Flask 서버의 /video_feed URL
# 스트리밍 데이터 읽기
stream = requests.get(url, stream=True)
if stream.status_code == 200:
print("Streaming 연결 성공")
byte_data = b"" # 스트리밍 데이터를 저장할 바이트 버퍼
for chunk in stream.iter_content(chunk_size=1024): # 1KB 단위로 데이터 읽기
byte_data += chunk
a = byte_data.find(b'\xff\xd8') # JPEG 시작 부분
b = byte_data.find(b'\xff\xd9') # JPEG 끝 부분
if a != -1 and b != -1: # JPEG 이미지의 시작과 끝이 존재할 때
jpg = byte_data[a:b+2] # JPEG 이미지 추출
byte_data = byte_data[b+2:] # 읽은 데이터 버퍼에서 제거
# JPEG 데이터를 OpenCV 이미지로 디코딩
frame = cv2.imdecode(np.frombuffer(jpg, dtype=np.uint8), cv2.IMREAD_COLOR)
# OpenCV로 이미지 표시
cv2.imshow("Raspberry Pi Stream", frame)
if cv2.waitKey(1) & 0xFF == ord('q'): # 'q'를 누르면 종료
break
else:
print(f"Streaming 연결 실패: 상태 코드 {stream.status_code}")
# OpenCV 윈도우 닫기
cv2.destroyAllWindows()
그러면 영상이 보일것이다.
이 영상에 yolo를 적용해보자.
아래는 내 컴퓨터(서버)의 코드이다.
import cv2
import requests
import numpy as np
from ultralytics import YOLO
# YOLO 모델 로드
model = YOLO("yolo11n.pt") # "yolov8n.pt"를 YOLO 모델 파일로 변경 가능
# 모바일로봇 서버의 스트리밍 URL
url = "http://xxx.xxx.0.xx:5000/video_feed" # Flask 서버의 /video_feed URL
# 스트리밍 데이터 읽기
stream = requests.get(url, stream=True)
if stream.status_code == 200:
print("Streaming 연결 성공")
byte_data = b"" # 스트리밍 데이터를 저장할 바이트 버퍼
for chunk in stream.iter_content(chunk_size=1024): # 1KB 단위로 데이터 읽기
byte_data += chunk
a = byte_data.find(b'\xff\xd8') # JPEG 시작 부분
b = byte_data.find(b'\xff\xd9') # JPEG 끝 부분
if a != -1 and b != -1: # JPEG 이미지의 시작과 끝이 존재할 때
jpg = byte_data[a:b+2] # JPEG 이미지 추출
byte_data = byte_data[b+2:] # 읽은 데이터 버퍼에서 제거
# JPEG 데이터를 OpenCV 이미지로 디코딩
frame = cv2.imdecode(np.frombuffer(jpg, dtype=np.uint8), cv2.IMREAD_COLOR)
# YOLO 모델로 객체 탐지
results = model.track(source=frame, conf=0.5, show=False, stream=True, verbose=False)
# 탐지 결과 처리 및 바운딩 박스 그리기
for result in results:
boxes = result.boxes # 탐지된 객체의 박스 정보
for box in boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0]) # 바운딩 박스 좌표
conf = box.conf[0] # 신뢰도
cls = int(box.cls[0]) # 클래스 ID
# label = f"{model.names[cls]} {conf:.2f}" # 클래스 이름 및 신뢰도
label = f"{model.names[cls]}"
# 바운딩 박스와 라벨 그리기
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX,
0.5, (0, 255, 0), 2)
# OpenCV로 이미지 표시
cv2.imshow("YOLO Object Detection", frame)
# 'q'를 누르면 종료
if cv2.waitKey(1) & 0xFF == ord('q'):
break
else:
print(f"Streaming 연결 실패: 상태 코드 {stream.status_code}")
# OpenCV 윈도우 닫기
cv2.destroyAllWindows()
우리는 모바일로봇이 사람만 쫒아다니면 되니깐 클래스를 사람으로 정해준다.
results = model.track(source=frame, classes=[0], conf=0.5, show=False, stream=True, verbose=False)
이처럼 classes 옵션만 넣어주면 된다.
import cv2
import requests
import numpy as np
from ultralytics import YOLO
# YOLO 모델 로드
model = YOLO("yolo11n.pt") # "yolov8n.pt"를 YOLO 모델 파일로 변경 가능
# 모바일로봇 서버의 스트리밍 URL
url = "http://xxx.xxx.0.xx:5000/video_feed" # Flask 서버의 /video_feed URL
# 스트리밍 데이터 읽기
stream = requests.get(url, stream=True)
if stream.status_code == 200:
print("Streaming 연결 성공")
byte_data = b"" # 스트리밍 데이터를 저장할 바이트 버퍼
for chunk in stream.iter_content(chunk_size=1024): # 1KB 단위로 데이터 읽기
byte_data += chunk
a = byte_data.find(b'\xff\xd8') # JPEG 시작 부분
b = byte_data.find(b'\xff\xd9') # JPEG 끝 부분
if a != -1 and b != -1: # JPEG 이미지의 시작과 끝이 존재할 때
jpg = byte_data[a:b+2] # JPEG 이미지 추출
byte_data = byte_data[b+2:] # 읽은 데이터 버퍼에서 제거
# JPEG 데이터를 OpenCV 이미지로 디코딩
frame = cv2.imdecode(np.frombuffer(jpg, dtype=np.uint8), cv2.IMREAD_COLOR)
# YOLO 모델로 객체 탐지
results = model.track(source=frame, classes=[0], conf=0.5, show=False, stream=True, verbose=False)
# 탐지 결과 처리 및 바운딩 박스 그리기
for result in results:
boxes = result.boxes # 탐지된 객체의 박스 정보
for box in boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0]) # 바운딩 박스 좌표
conf = box.conf[0] # 신뢰도
cls = int(box.cls[0]) # 클래스 ID
# label = f"{model.names[cls]} {conf:.2f}" # 클래스 이름 및 신뢰도
label = f"{model.names[cls]}"
# 바운딩 박스와 라벨 그리기
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX,
0.5, (0, 255, 0), 2)
# OpenCV로 이미지 표시
cv2.imshow("YOLO Object Detection", frame)
# 'q'를 누르면 종료
if cv2.waitKey(1) & 0xFF == ord('q'):
break
else:
print(f"Streaming 연결 실패: 상태 코드 {stream.status_code}")
# OpenCV 윈도우 닫기
cv2.destroyAllWindows()
'Project' 카테고리의 다른 글
모바일 로봇 프로젝트 3 - Human Pose (0) | 2024.11.30 |
---|---|
모바일 로봇 프로젝트 2 - SAM2 (2) | 2024.11.30 |
Langchain 프롬프트 메이커 및 미니 프로젝트 (0) | 2024.09.24 |
LoGO 해외로고 프로젝트 - AIRFLOW 도입기 (0) | 2024.09.10 |
[RAG 프로젝트][데이콘] 재정정보 AI 검색 알고리즘 경진대회 3 (0) | 2024.08.21 |
Contents
소중한 공감 감사합니다