Project
AI캐디 프로젝트 1 - Flask 실시간 webcam - SAM2
- -
모바일로봇의 카메라를 실시간으로 입력받아서 GPU가 있는 컴퓨터에서 인공지능을 돌리고
그 결과값을 모바일 로봇에게 보내 제어를 하려고 한다.
실시간으로 모바일로봇 카메라를 수신받는 방법이 무엇이 있을까
Socket 방식에서 TCP와 UDP 방식이 있을 수 있고
HTTP 방식이 있을 수 있다.
나는 로봇이 보내는 영상을 휴먼 팔로잉할때도 써야하고 Action Recognition 할때도 써야하니깐
쉽게 접근 가능한 HTTP 방식의 Flask를 사용하기로 하였다.
로봇 / 보내는 쪽 (송신) 코드
더보기
import cv2
from flask import Flask, Response
import time
# import request
app = Flask(__name__)
# OpenCV로 웹캠 캡처 객체 생성
camera = cv2.VideoCapture(1)
def generate_frames():
"""웹캠에서 프레임을 가져와 스트리밍"""
while True:
success, frame = camera.read() # 웹캠에서 프레임 읽기
if not success:
break
else:
# 프레임 크기 조정 (640x480)
height, width, channel = frame.shape
# frame = cv2.resize(frame, (int(width//2), int(height//2)))
# frame = cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE)
# JPEG 인코딩 품질 조정
ret, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 70])
frame = buffer.tobytes()
# 프레임 데이터를 전송
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
# FPS 제한 (30 FPS)
time.sleep(1 / 30)
@app.route('/video_feed')
def video_feed():
"""비디오 스트리밍"""
return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')
if __name__ == "__main__":
# Flask 앱 실행
try:
app.run(host='0.0.0.0', port=5000, threaded=True)
finally:
# 리소스 해제
if camera.isOpened():
camera.release()
받는 쪽 (수신) 코드
더보기
import cv2
import requests
import numpy as np
# 모바일로봇 서버의 스트리밍 URL
url = "http://192.168.0.127:5000/video_feed" # Flask 서버의 /video_feed URL
# 스트리밍 데이터 읽기
stream = requests.get(url, stream=True)
if stream.status_code == 200:
print("Streaming 연결 성공")
byte_data = b"" # 스트리밍 데이터를 저장할 바이트 버퍼
for chunk in stream.iter_content(chunk_size=1024): # 1KB 단위로 데이터 읽기
byte_data += chunk
a = byte_data.find(b'\xff\xd8') # JPEG 시작 부분
b = byte_data.find(b'\xff\xd9') # JPEG 끝 부분
if a != -1 and b != -1: # JPEG 이미지의 시작과 끝이 존재할 때
jpg = byte_data[a:b+2] # JPEG 이미지 추출
byte_data = byte_data[b+2:] # 읽은 데이터 버퍼에서 제거
# JPEG 데이터를 OpenCV 이미지로 디코딩
frame = cv2.imdecode(np.frombuffer(jpg, dtype=np.uint8), cv2.IMREAD_COLOR)
# OpenCV로 이미지 표시
cv2.imshow("YOLO Object Detection", frame)
# 'q'를 누르면 종료
if cv2.waitKey(1) & 0xFF == ord('q'):
break
else:
print(f"Streaming 연결 실패: 상태 코드 {stream.status_code}")
# OpenCV 윈도우 닫기
cv2.destroyAllWindows()
다음으로 Yolo를 적용해보자.
utils 폴더 안에 yolo_fuc.py 코드
더보기
# YOLO 모델로 객체 탐지
from ultralytics import YOLO
model = YOLO("./models/yolo11n.pt")
def get_yolo():
return model
def get_bbox(frame):
results = model.track(source=frame, classes=[0], conf=0.5, show=False, stream=True, verbose=False)
largest_box = None # 가장 큰 바운딩 박스를 저장할 변수
largest_area = 0 # 가장 큰 바운딩 박스의 면적
# 탐지 결과 처리
for result in results:
boxes = result.boxes # 탐지된 객체의 박스 정보
for box in boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0]) # 바운딩 박스 좌표
area = (x2 - x1) * (y2 - y1) # 바운딩 박스 면적 계산
# 가장 큰 바운딩 박스 갱신
if area > largest_area:
largest_area = area
largest_box = (x1, y1, x2, y2, box.conf[0], int(box.cls[0])) # 좌표, 신뢰도, 클래스 저장
return largest_box
다음으로 영상 실시간으로 수신받고 yolo 적용 코드
더보기
import cv2
import requests
import numpy as np
from utils.yolo_fuc import get_bbox
# 모바일로봇 서버의 스트리밍 URL
url = "http://192.168.0.127:5000/video_feed" # Flask 서버의 /video_feed URL
# 스트리밍 데이터 읽기
stream = requests.get(url, stream=True)
if stream.status_code == 200:
print("Streaming 연결 성공")
byte_data = b"" # 스트리밍 데이터를 저장할 바이트 버퍼
for chunk in stream.iter_content(chunk_size=1024): # 1KB 단위로 데이터 읽기
byte_data += chunk
a = byte_data.find(b'\xff\xd8') # JPEG 시작 부분
b = byte_data.find(b'\xff\xd9') # JPEG 끝 부분
if a != -1 and b != -1: # JPEG 이미지의 시작과 끝이 존재할 때
jpg = byte_data[a:b+2] # JPEG 이미지 추출
byte_data = byte_data[b+2:] # 읽은 데이터 버퍼에서 제거
# JPEG 데이터를 OpenCV 이미지로 디코딩
frame = cv2.imdecode(np.frombuffer(jpg, dtype=np.uint8), cv2.IMREAD_COLOR)
frame = cv2.flip(frame, 1)
largest_box = get_bbox(frame)
# 가장 큰 바운딩 박스가 있는 경우 화면에 그리기
if largest_box:
x1, y1, x2, y2, conf, cls = largest_box
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
# OpenCV로 이미지 표시
cv2.imshow("YOLO Object Detection", frame)
# 'q'를 누르면 종료
if cv2.waitKey(1) & 0xFF == ord('q'):
break
else:
print(f"Streaming 연결 실패: 상태 코드 {stream.status_code}")
# OpenCV 윈도우 닫기
cv2.destroyAllWindows()
실시간으로 잘 작동한다.
그렇다면 이번에는 sam2를 적용해보자.
sam2 환경을 먼저 구축해준다.
https://github.com/khw11044/SAM2_streaming
다음은 utils 폴더안에 sam2_fuc.py 코드이다.
더보기
import torch
from sam2.build_sam import build_sam2_camera_predictor
# use bfloat16 for the entire notebook
torch.autocast(device_type="cuda", dtype=torch.bfloat16).__enter__()
if torch.cuda.get_device_properties(0).major >= 8:
# turn on tfloat32 for Ampere GPUs (https://pytorch.org/docs/stable/notes/cuda.html#tensorfloat-32-tf32-on-ampere-devices)
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
sam2_checkpoint = "./checkpoints/sam2_hiera_small.pt"
model_cfg = "sam2_hiera_s.yaml"
predictor = build_sam2_camera_predictor(model_cfg, sam2_checkpoint)
def get_predictor():
return predictor
그리고 이 함수를 임포트해서 수신받는 컴퓨터 코드를 작성한다.
더보기
import cv2
import requests
import numpy as np
from utils.sam2_fuc import get_predictor
from utils.yolo_fuc import get_yolo, get_bbox
sam2 = get_predictor()
yolo = get_yolo()
# 모바일로봇 서버의 스트리밍 URL
url = "http://192.168.0.127:5000/video_feed" # Flask 서버의 /video_feed URL
# 스트리밍 데이터 읽기
stream = requests.get(url, stream=True, timeout=5)
if_init = False
largest_bbox=None
if stream.status_code == 200:
print("Streaming 연결 성공")
byte_data = b"" # 스트리밍 데이터를 저장할 바이트 버퍼
for chunk in stream.iter_content(chunk_size=1024): # 1KB 단위로 데이터 읽기
byte_data += chunk
a = byte_data.find(b'\xff\xd8') # JPEG 시작 부분
b = byte_data.find(b'\xff\xd9') # JPEG 끝 부분
if a != -1 and b != -1: # JPEG 이미지의 시작과 끝이 존재할 때
jpg = byte_data[a:b+2] # JPEG 이미지 추출
byte_data = byte_data[b+2:] # 읽은 데이터 버퍼에서 제거
# JPEG 데이터를 OpenCV 이미지로 디코딩
frame = cv2.imdecode(np.frombuffer(jpg, dtype=np.uint8), cv2.IMREAD_COLOR)
frame = cv2.flip(frame, 1)
width, height = frame.shape[:2][::-1]
# 중심점 계산
center_x, center_y = width // 2, height // 2
if not largest_bbox:
largest_bbox = get_bbox(frame)
# 가장 큰 바운딩 박스가 있는 경우 화면에 그리기
if largest_bbox:
x1, y1, x2, y2, conf, cls = largest_bbox
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
if largest_bbox and not if_init:
sam2.load_first_frame(frame)
bbox = np.array([[largest_bbox[0], largest_bbox[1]],
[largest_bbox[2], largest_bbox[3]]], dtype=np.float32)
_, out_obj_ids, out_mask_logits = sam2.add_new_prompt(frame_idx=0, obj_id=1, bbox=bbox)
if_init = True
elif if_init:
out_obj_ids, out_mask_logits = sam2.track(frame)
all_mask = np.zeros((height, width, 1), dtype=np.uint8)
for i in range(len(out_obj_ids)):
out_mask = (out_mask_logits[i] > 0.0).permute(1, 2, 0).byte().cuda()
all_mask = cv2.bitwise_or(all_mask, out_mask.cpu().numpy() * 255)
# 마스크 적용
if all_mask is not None:
all_mask = cv2.cvtColor(all_mask, cv2.COLOR_GRAY2RGB)
frame = cv2.addWeighted(frame, 1, all_mask, 0.5, 0)
# OpenCV로 이미지 표시
cv2.imshow("YOLO Object Detection", frame)
# 'q'를 누르면 종료
if cv2.waitKey(1) & 0xFF == ord('q'):
break
else:
print(f"Streaming 연결 실패: 상태 코드 {stream.status_code}")
# OpenCV 윈도우 닫기
cv2.destroyAllWindows()
역시나 느리다.
어떻게 해야할까?
멀티 쓰레드를 사용하는거다.
더보기
import cv2
import requests
import numpy as np
import threading
import queue
import torch
from utils.sam2_fuc import get_predictor
from utils.yolo_fuc import get_yolo, get_bbox
# 모델 및 장치 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
# SAM2와 YOLO 모델 초기화
sam2 = get_predictor()
yolo = get_yolo()
# 스트리밍 URL 및 상태 변수
url = "http://192.168.0.127:5000/video_feed"
frame_queue = queue.Queue(maxsize=5) # 프레임 큐
if_init = False
largest_bbox = None
running = True
# 실시간 스트림을 수신하는 스레드
def stream_frames():
global running
print("스트리밍 시작...")
stream = requests.get(url, stream=True, timeout=5)
if stream.status_code == 200:
byte_data = b""
for chunk in stream.iter_content(chunk_size=1024):
if not running:
break
byte_data += chunk
a = byte_data.find(b'\xff\xd8')
b = byte_data.find(b'\xff\xd9')
if a != -1 and b != -1:
jpg = byte_data[a:b+2]
byte_data = byte_data[b+2:]
frame = cv2.imdecode(np.frombuffer(jpg, dtype=np.uint8), cv2.IMREAD_COLOR)
frame = cv2.flip(frame, 1)
if not frame_queue.full():
frame_queue.put(frame)
else:
print(f"스트리밍 실패: {stream.status_code}")
# YOLO 및 SAM2로 프레임 처리하는 스레드
def process_frames():
global running, if_init, largest_bbox
print("프레임 처리 시작...")
while running:
if not frame_queue.empty():
frame = frame_queue.get()
height, width = frame.shape[:2]
# YOLO를 통해 가장 큰 객체 감지
if not largest_bbox:
largest_bbox = get_bbox(frame)
if largest_bbox:
x1, y1, x2, y2, _, _ = largest_bbox
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
# SAM2 모델로 객체 세그멘테이션
with torch.autocast(device_type="cuda", dtype=torch.bfloat16):
if largest_bbox and not if_init:
sam2.load_first_frame(frame)
bbox = np.array([[x1, y1], [x2, y2]], dtype=np.float32)
_, out_obj_ids, out_mask_logits = sam2.add_new_prompt(frame_idx=0, obj_id=1, bbox=bbox)
if_init = True
elif if_init:
out_obj_ids, out_mask_logits = sam2.track(frame)
all_mask = torch.zeros((height, width), dtype=torch.uint8, device=device)
for i in range(len(out_obj_ids)):
out_mask = (out_mask_logits[i] > 0.0).byte()
all_mask = torch.bitwise_or(all_mask, out_mask.squeeze(0))
all_mask = all_mask.cpu().numpy() * 255
all_mask = cv2.cvtColor(all_mask, cv2.COLOR_GRAY2BGR)
frame = cv2.addWeighted(frame, 1, all_mask, 0.5, 0)
# 최종 프레임 출력
cv2.imshow("YOLO Object Detection & SAM2 Segmentation", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
running = False
break
# 스레드 시작
stream_thread = threading.Thread(target=stream_frames)
process_thread = threading.Thread(target=process_frames)
stream_thread.start()
process_thread.start()
# 스레드 종료 대기
stream_thread.join()
process_thread.join()
# 리소스 정리
cv2.destroyAllWindows()
print("프로그램 종료.")
최종 코드 보기
https://github.com/khw11044/flask-SAM2-multi-thread
'Project' 카테고리의 다른 글
AI캐디 프로젝트 3 - Depth anything V2- webcam real-time (0) | 2024.12.12 |
---|---|
AI캐디 프로젝트 2 - Action Recognition (0) | 2024.12.10 |
모바일 로봇 프로젝트 3 - Human Pose (0) | 2024.11.30 |
모바일 로봇 프로젝트 2 - SAM2 (2) | 2024.11.30 |
모바일 로봇 프로젝트 1 - YOLO (0) | 2024.11.30 |
Contents
소중한 공감 감사합니다