-
Notifications
You must be signed in to change notification settings - Fork 1
Open
Description
my code like this
from camera_new.common.Control import Control
from camera_new.common.Streaming import Data
from camera_new.common.Stream import Streaming
from camera_new.common.Streaming.BlobServerConfiguration import BlobClientConfig
import numpy as np
import cv2
import datetime
import os
from util.conf_util import read_cof, write_cof
import struct
from util.mylogger import mylog
import threading
import time
class VisionaryS:
_instance_lock = threading.Lock()
path = read_cof()['Camera']['image_path']
password = read_cof()['Camera']['password']
shit = read_cof()['Camera']['shit']
saveply = read_cof()['Camera']['saveply']
def __init__(self):
try:
ipAddress = read_cof()['Camera']['ip']
self.deviceControl = Control(ipAddress, "ColaB", 2112)
self.deviceControl.open()
self.deviceControl.stopStream()
time.sleep(0.1)
self.deviceControl.login(Control.USERLEVEL_SERVICE, 'CUST_SERV')
self.streaming_settings = BlobClientConfig(self.deviceControl)
self.streaming_settings.setTransportProtocol(self.streaming_settings.PROTOCOL_TCP)
self.streaming_settings.setBlobTcpPort(2114)
self.deviceStreaming = Streaming(ipAddress, 2114)
self.deviceStreaming.openStream()
self.deviceControl.logout()
self.frameData = Data.Data()
self.poll_period_span = 500 / 1000.0 # Convert milliseconds to seconds
self.last_snap_time = time.time()
except Exception as e:
mylog.error(f"相机连接失败:{e}")
def get_exposure(self):
self.deviceControl.login(Control.USERLEVEL_SERVICE, self.password)
exposure = self.deviceControl.getIntegrationTimeUsColor()
self.deviceControl.logout()
return exposure
def auto_expose(self):
"""
自动曝光
"""
self.deviceControl.login(Control.USERLEVEL_SERVICE, self.password)
for i in range(3):
auto_type = i
auto_exposure_response = self.deviceControl.startAutoExposureParameterized(
struct.pack(">HB", 1, auto_type))
if not auto_exposure_response:
mylog.error(
f"ERROR: Invoking 'TriggerAutoExposureParameterized' fails! (autoExposureResponse: {auto_exposure_response}")
# Wait until auto exposure method is finished
auto_exp_param_running = True
start_time = time.time()
while auto_exp_param_running:
auto_exp_param_running = self.deviceControl.getAutoExposureParameterizedRunning()
time_now = time.time()
# 10 sec (time after auto exposure method should be finished)
if (time_now - start_time) <= 10:
time.sleep(1)
else:
mylog.error(
f"TIMEOUT: auto exposure function (Param: {auto_type}) needs longer than expected!")
integration_time_us_color = self.deviceControl.getIntegrationTimeUsColor()
mylog.info(f"Read IntegrationTimeUSColor: {integration_time_us_color}")
self.deviceControl.logout()
# self.deviceControl.stopStream() # the device starts stream automatically
# self.deviceStreaming.openStream()
# self.deviceStreaming.sendBlobRequest()
write_cof('Camera', 'exposure', integration_time_us_color)
return integration_time_us_color
def camera_trigger(self):
time_since_last_snap = time.time() - self.last_snap_time
if time_since_last_snap < self.poll_period_span:
time_to_wait = self.poll_period_span - time_since_last_snap
time.sleep(time_to_wait)
self.last_snap_time = time.time()
self.deviceControl.singleStep()
self.deviceStreaming.getFrame()
self.frameData.read(self.deviceStreaming.frame)
if self.frameData.hasDepthMap:
zmapData = list(self.frameData.depthmap.distance)
zmapDataArray = np.uint16(np.reshape(zmapData, (512, 640, 1)))
rgbaDataArray = np.uint32(np.reshape(self.frameData.depthmap.intensity, (512, 640)))
rgbaDataArray = np.frombuffer(rgbaDataArray, np.uint8)
rgbaDataArray = np.reshape(rgbaDataArray, (512, 640, 4))
img_bgr = cv2.cvtColor(rgbaDataArray, cv2.COLOR_RGBA2BGRA)
filename = datetime.datetime.now().strftime('%d_%H_%M_%S') + ".jpg"
if not os.path.exists(self.path):
os.makedirs(self.path)
filepath = self.path + '/' + filename
cv2.imwrite(filepath, img_bgr)
return img_bgr, zmapDataArray
else:
return None, None
def camera_close(self):
self.deviceControl.login(Control.USERLEVEL_AUTH_CLIENT, 'CLIENT')
self.deviceStreaming.closeStream()
self.streaming_settings.setBlobTcpPort(2114)
self.deviceControl.logout()
self.deviceControl.close()
mylog.info('相机断开连接')
def camera_set_exposure(self, size):
self.deviceControl.login(Control.USERLEVEL_SERVICE, self.password)
try:
self.deviceControl.setIntegrationTimeUsColor(size)
write_cof('Camera', 'exposure', size)
mylog.info('相机曝光{}us 成功'.format(size))
except Exception as e:
mylog.info('相机曝光失败{},启动自动曝光'.format(e))
self.auto_expose()
finally:
self.deviceControl.logout()
def cam2word(self, center_x, center_y, zc):
cx = self.frameData.cameraParams.cx
fx = self.frameData.cameraParams.fx
cy = self.frameData.cameraParams.cy
fy = self.frameData.cameraParams.fy
m_c2w = self.frameData.cameraParams.cam2worldMatrix
xp = (cx - center_x) / fx
yp = (cy - center_y) / fy
xc = xp * zc
yc = yp * zc
x_w = round(m_c2w[3] + zc * m_c2w[2] + yc * m_c2w[1] + xc * m_c2w[0], 0)
y_w = round(m_c2w[7] + zc * m_c2w[6] + yc * m_c2w[5] + xc * m_c2w[4], 0)
z_w = round(m_c2w[11] + zc * m_c2w[10] + yc * m_c2w[9] + xc * m_c2w[8], 0)
return x_w, y_w, z_w
if __name__ == '__main__':
vs = VisionaryS()
for _ in range(100):
img, zmap = vs.camera_trigger()
vs.auto_expose()
Metadata
Metadata
Assignees
Labels
No labels