이 글에서는 Python과 PyQt5를 사용하여 eBUS SDK Python API를 기반으로 Jai 카메라와의 다중 채널 카메라 스트리밍 애플리케이션을 개발하는 과정을 설명합니다. 이 프로젝트는 여러 카메라에서 실시간 스트리밍 데이터를 수집하고, 이를 PyQt5 기반 UI에서 표시하며, 각 채널의 노출 시간을 조정할 수 있는 기능을 포함합니다. Jai 카메라를 통해 수집한 이미지를 활용한 딥러닝 작업을 진행하기에 앞서 개발중에 있는 카메라 관련 초기 개발모델에 대해서 정의합니다.
프로젝트 구성
프로젝트는 다음 세 가지 주요 파일로 구성되어 있습니다:
- main.py: 애플리케이션의 진입점으로, PyQt5 애플리케이션을 초기화하고 주요 클래스와 UI를 실행합니다.
- device_manager.py: eBUS SDK를 활용해 장치를 연결하고 데이터를 수집하는 핵심 로직이 포함된 클래스입니다.
- multi_camera_stream_app.py: PyQt5 기반 UI를 제공하며, 여러 카메라 스트림을 표시하고 각 채널의 설정을 관리합니다.
주요 구현
1. main.py: 애플리케이션 초기화
def main():
app = QApplication([])
# eBUSDeviceManager를 초기화하고 장치 연결
device_manager = eBUSDeviceManager()
try:
device_manager.connect(num_channels=3)
device_manager.start_acquisition()
# UI 초기화 및 실행
stream_app = MultiChannelStreamApp(device_manager=device_manager, num_channels=3)
stream_app.show()
sys.exit(app.exec_())
except Exception as e:
print(f"오류 발생: {e}")
finally:
device_manager.cleanup()
이 코드는 애플리케이션을 초기화하고 device_manager를 통해 카메라를 연결 및 스트리밍을 시작합니다. UI는 MultiChannelStreamApp 클래스를 사용하여 생성됩니다.
2. device_manager.py: 장치 연결 및 데이터 관리
주요 클래스: eBUSDeviceManager
- 장치 연결 (connect_to_device)
def connect_to_device(self, connection_ID):
print("장치에 연결 중...")
result, device = eb.PvDevice.CreateAndConnect(connection_ID)
if not device:
raise Exception(f"연결 실패: {result.GetCodeString()} ({result.GetDescription()})")
print("장치가 연결되었습니다.")
return device
connection_ID를 사용해 PvDevice.CreateAndConnect를 통해 장치를 연결합니다.
- 채널 구성 (configure_channels)
def configure_channels(self, device, num_channels=3):
params = device.GetParameters()
channel_selector = params.Get("GevStreamChannelSelector")
scp_port = params.Get("GevSCPHostPort")
gev_scda = params.Get("GevSCDA")
if not gev_scda or not scp_port or not channel_selector:
raise Exception("매개변수 누락: GevSCDA, GevSCPHostPort, 또는 GevStreamChannelSelector.")
for channel in range(num_channels):
print(f"채널 {channel} 구성 중...")
if not channel_selector.SetValue(channel).IsOK():
raise Exception(f"채널 {channel} 선택 실패")
assigned_port = 60000 + channel
if scp_port.IsWritable() and not scp_port.SetValue(assigned_port).IsOK():
raise Exception(f"채널 {channel}에 대한 GevSCPHostPort 설정 실패")
ip_address = (255 << 24) | (255 << 16) | (100 << 8) | 100
if gev_scda.IsWritable() and not gev_scda.SetValue(ip_address).IsOK():
raise Exception(f"채널 {channel}에 대한 GevSCDA IP 설정 실패")
채널 선택(GevStreamChannelSelector), 호스트 포트(GevSCPHostPort)와 IP 주소(GevSCDA) 설정.
- 스트림 열기 (open_stream)
def open_stream(self, connection_ID, channel):
print(f"채널 {channel}에 대한 스트림 열기 중...")
stream = eb.PvStreamGEV()
if not stream.Open(connection_ID, 60000 + channel).IsOK():
raise Exception(f"채널 {channel}에 대한 스트림 열기 실패")
print(f"채널 {channel}에 대한 스트림이 열렸습니다.")
return stream
PvStreamGEV를 사용해 각 채널의 스트림을 열고 포트를 기반으로 스트림을 구성합니다.
- PvStreamGEV는 eBUS SDK의 스트림 객체로, 스트림 데이터 전송을 처리합니다.
- 파이프라인 구성 (configure_pipeline)
def configure_pipeline(self, device, stream):
print("파이프라인 초기화 중...")
pipeline = eb.PvPipeline(stream)
pipeline.SetBufferCount(16)
pipeline.SetBufferSize(device.GetPayloadSize())
pipeline.Start()
print("파이프라인이 시작되었습니다.")
return pipeline
스트림 데이터를 처리하기 위한 버퍼 크기와 버퍼 수를 설정합니다.
- 프레임 수집 (fetch_frame)
def fetch_frame(self, channel, timeout=5000):
if not self.connected:
print(f"장치가 연결되지 않았습니다. 채널={channel}")
return None
pipeline = self.pipelines[channel]
for _ in range(3):
result, buffer, op_result = pipeline.RetrieveNextBuffer(timeout)
if result.IsOK() and op_result.IsOK() and buffer.GetPayloadType() == eb.PvPayloadTypeImage:
image = buffer.GetImage()
frame_data = np.array(image.GetDataPointer(), dtype=np.uint8).reshape((image.GetHeight(), image.GetWidth(), -1))
pipeline.ReleaseBuffer(buffer)
return frame_data
pipeline.ReleaseBuffer(buffer)
print(f"채널 {channel}에 대한 프레임 가져오기 실패.")
return None
지정된 채널에서 스트림 데이터를 수집하고 이미지를 반환합니다.
- 노출시간 수집/설정
def get_exposure(self, channel):
if not self.connected:
raise Exception("장치가 연결되지 않았습니다.")
params = self.device.GetParameters()
if not params.Get("GevStreamChannelSelector").SetValue(channel).IsOK():
raise Exception(f"채널 {channel} 선택 실패")
result, exposure_value = params.Get("ExposureTime").GetValue()
if not result.IsOK():
raise Exception(f"채널 {channel}에 대한 노출 시간 가져오기 실패")
return exposure_value
def set_exposure(self, channel, exposure_time):
if not self.connected:
raise Exception("장치가 연결되지 않았습니다.")
params = self.device.GetParameters()
if not params.Get("GevStreamChannelSelector").SetValue(channel).IsOK():
raise Exception(f"채널 {channel} 선택 실패")
exposure_param = params.Get("ExposureTime")
if not exposure_param.SetValue(exposure_time).IsOK():
raise Exception(f"채널 {channel}에 대한 노출 시간 설정 실패")
result, current_exposure = exposure_param.GetValue()
if result.IsOK():
print(f"채널 {channel}에 대한 노출 시간이 {current_exposure}로 설정되었습니다.")
채널별 현재 노출값을 가져오거나 설정할 수 있습니다.
3. multi_camera_stream_app.py: UI 구현
주요 클래스: MultiChannelStreamApp
- UI 초기화 (init_ui)
def init_ui(self):
self.setWindowTitle("다중 채널 스트림 뷰어")
self.central_widget = QWidget()
self.grid_layout = QGridLayout(self.central_widget)
self.image_labels = [self.create_image_label(i) for i in range(self.num_channels)]
각 카메라 채널에 대한 이미지 뷰어를 생성하고 UI에 배치합니다.
- 노출 시간 설정
def update_exposure_settings(self, channel, input_field, current_label):
exposure = float(input_field.text())
self.device_manager.set_exposure(channel, exposure)
current_label.setText(f"현재 노출 값: {exposure}")
사용자가 입력한 값을 장치에 전달하여 채널의 노출 시간을 설정합니다.
- 프레임 업데이트
def update_frames(self):
for i in range(self.num_channels):
frame = self.device_manager.fetch_frame(i)
if frame is not None:
self.display_frame(frame, self.image_labels[i])
각 채널의 프레임을 수집하여 UI에 표시합니다.
실행 화면
- 다중 채널 스트림 뷰어
- 각 채널의 스트림 데이터가 실시간으로 표시됩니다.
- 각 채널의 노출 시간을 조정할 수 있는 입력 필드와 버튼이 제공됩니다.
- 노출 시간 조정
- 사용자가 노출 시간을 입력하고 설정 버튼을 클릭하면 해당 채널의 설정이 업데이트됩니다.
마무리
이 프로젝트는 다중 채널 카메라 스트리밍을 하나의 UI에서 관리하고자 하는 목적으로 개발하게 되었으며, eBUS SDK를 활용한 개발에 있어서 어떤 방식으로 개발하면 될지에 대한 하나의 접근법에 대해서 설명하고자 정리해 보았습니다.
추가 자료
- Jai eBUS SDK 공식 문서: https://www.jai.com/kr/support-software/jai-software
- PyQt5 공식 문서: https://www.riverbankcomputing.com/static/Docs/PyQt5/