feat: use redis to communicate movement
This commit is contained in:
@@ -11,9 +11,28 @@
|
||||
**********************************************************************
|
||||
'''
|
||||
|
||||
import smbus
|
||||
from smbus2 import SMBus
|
||||
import time
|
||||
import math
|
||||
import os
|
||||
|
||||
|
||||
class MockSMBus:
|
||||
def __init__(self, bus_number):
|
||||
self.bus_number = bus_number
|
||||
self.data = {}
|
||||
|
||||
def write_byte_data(self, addr, res, value):
|
||||
print(f"SMBus write {addr}[{res}] {value}")
|
||||
if addr not in self.data:
|
||||
self.data[addr] = {}
|
||||
self.data[addr][res] = value if value else 0
|
||||
|
||||
def read_byte_data(self, addr, res):
|
||||
print(f"SMBus read {addr}[{res}]")
|
||||
if addr not in self.data:
|
||||
self.data[addr] = {}
|
||||
return self.data[addr][res] if res in self.data[addr] else 0
|
||||
|
||||
|
||||
def map_range(x, in_min, in_max, out_min, out_max):
|
||||
@@ -53,7 +72,10 @@ class PWM:
|
||||
self.bus_number = 1
|
||||
else:
|
||||
self.bus_number = bus_number
|
||||
self.bus = smbus.SMBus(self.bus_number)
|
||||
if os.environ['PCA9685_MOCK_SMBUS'] == 'true':
|
||||
self.bus = MockSMBus(self.bus_number)
|
||||
else:
|
||||
self.bus = SMBus(self.bus_number)
|
||||
self._frequency = 60
|
||||
|
||||
def _debug_(self, message):
|
||||
@@ -109,7 +131,7 @@ class PWM:
|
||||
prescale = math.floor(prescale_value + 0.5)
|
||||
self._debug_('Final pre-scale: %d' % prescale)
|
||||
|
||||
old_mode = self._read_byte_data(self._MODE1);
|
||||
old_mode = self._read_byte_data(self._MODE1)
|
||||
new_mode = (old_mode & 0x7F) | 0x10
|
||||
self._write_byte_data(self._MODE1, new_mode)
|
||||
self._write_byte_data(self._PRESCALE, int(math.floor(prescale)))
|
||||
|
||||
@@ -3,7 +3,7 @@ import collections
|
||||
from .pca9685 import ServoController, map_range
|
||||
from .dc_motor_controller import DcMotor, setup_gpio
|
||||
|
||||
ServoMinMax = collections.namedtuple('ServoMinMax', ['minval', 'maxval'])
|
||||
ServoMinMax = collections.namedtuple('ServoMinMax', ['minval', 'maxval', 'restval', 'maxstep'])
|
||||
|
||||
try:
|
||||
from RPi import GPIO
|
||||
@@ -25,13 +25,13 @@ SERVO_NECK_TOP = 10
|
||||
SERVO_NECK_BOTTOM = 9
|
||||
|
||||
SERVO_MIN_MAX = {
|
||||
SERVO_ARM_L: ServoMinMax(50, 130),
|
||||
SERVO_ARM_R: ServoMinMax(50, 130),
|
||||
SERVO_EYE_L: ServoMinMax(40, 100),
|
||||
SERVO_EYE_R: ServoMinMax(80, 120),
|
||||
SERVO_NECK_ROTATE: ServoMinMax(60, 120), # 60 - 90 - 120
|
||||
SERVO_NECK_TOP: ServoMinMax(30, 180),
|
||||
SERVO_NECK_BOTTOM: ServoMinMax(10, 180),
|
||||
SERVO_ARM_L: ServoMinMax(50, 130, 50, 5),
|
||||
SERVO_ARM_R: ServoMinMax(50, 130, 130, 5),
|
||||
SERVO_EYE_L: ServoMinMax(40, 100, 40, 5),
|
||||
SERVO_EYE_R: ServoMinMax(80, 120, 120, 5),
|
||||
SERVO_NECK_ROTATE: ServoMinMax(60, 120, 90, 5), # 60 - 90 - 120
|
||||
SERVO_NECK_TOP: ServoMinMax(30, 180, 30, 5),
|
||||
SERVO_NECK_BOTTOM: ServoMinMax(10, 180, 10, 5),
|
||||
}
|
||||
|
||||
|
||||
@@ -42,14 +42,37 @@ class WallE:
|
||||
self.motor_b = DcMotor(PIN_MOTOR_B_ENABLE, PIN_MOTOR_B_REVERSE)
|
||||
self.servo_controller = ServoController()
|
||||
self.servo_controller.setup()
|
||||
self.servo_positions = {}
|
||||
self.servo_targets = {}
|
||||
|
||||
def setup(self):
|
||||
for channel, min_max in SERVO_MIN_MAX.items():
|
||||
self.servo_controller.write(channel, min_max.restval)
|
||||
self.servo_positions[channel] = min_max.restval
|
||||
|
||||
def tick(self):
|
||||
for channel, target in self.servo_targets.items():
|
||||
current_value = self.servo_positions[channel]
|
||||
if target == current_value:
|
||||
del self.servo_targets[channel]
|
||||
continue
|
||||
try:
|
||||
servo_min_max = SERVO_MIN_MAX[channel]
|
||||
except IndexError:
|
||||
continue
|
||||
delta = abs(current_value - target)
|
||||
step_size = min(delta, servo_min_max.maxstep)
|
||||
new_val = current_value + step_size if target > current_value else current_value - step_size
|
||||
self.servo_controller.write(channel, new_val)
|
||||
self.servo_positions[channel] = new_val
|
||||
|
||||
def set_servo(self, channel, value):
|
||||
try:
|
||||
min_val, max_val = SERVO_MIN_MAX[channel]
|
||||
servo_min_max = SERVO_MIN_MAX[channel]
|
||||
except IndexError:
|
||||
return None
|
||||
value = min(max_val, max(min_val, value))
|
||||
self.servo_controller.write(channel, value)
|
||||
value = min(servo_min_max.maxval, max(servo_min_max.minval, value))
|
||||
self.servo_targets[channel] = value
|
||||
|
||||
def set_arm_l(self, val):
|
||||
self.set_servo(SERVO_ARM_L, val)
|
||||
|
||||
Reference in New Issue
Block a user