How To Use the Raspberry Pi Python SDK
Where to get the code
Pull down the newest Python SDK from the GitHub repo!
You can follow our Getting Started with Raspberry Pi instructions for more details.
Asynchronous
API and Shell (Pinging)
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import SpheroRvrTargets
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def main():
""" This program demonstrates how to use the echo command, which sends data to RVR and RVR returns
the same data. Echo can be used to check to see if RVR is connected and awake.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
echo_response = await rvr.echo(
data=[0, 1, 2],
target=SpheroRvrTargets.primary.value
)
print('Echo response 1: ', echo_response)
echo_response = await rvr.echo(
data=[4, 5, 6],
target=SpheroRvrTargets.secondary.value
)
print('Echo response 2: ', echo_response)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
API and Shell (Pinging) with REST API
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import RestfulAsyncDal
from sphero_sdk import SpheroRvrTargets
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=RestfulAsyncDal(
domain='10.211.2.21', # Add your raspberry-pi's IP address here
port=2010 # The port opened by the npm server is always 2010
)
)
async def main():
""" This program demonstrates how to use the echo command, which sends data to RVR and has RVR returns
the same data. Echo can be used to check to see if RVR is connected and awake. In order to test it,
a node.js server must be running on the raspberry-pi connected to RVR. This code is meant to be
executed from a separate computer.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
echo_response = await rvr.echo(
data=[0, 1, 2],
target=SpheroRvrTargets.primary.value
)
print('Echo response 1: ', echo_response)
echo_response = await rvr.echo(
data=[4, 5, 6],
target=SpheroRvrTargets.secondary.value
)
print('Echo response 2: ', echo_response)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Color Sensor (Color Detection)
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import RvrStreamingServices
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def color_detected_handler(color_detected_data):
print('Color detection data response: ', color_detected_data)
async def main():
""" This program demonstrates how to use the color sensor on RVR (located on the down side of RVR, facing the floor)
to report colors detected. To exit program, press <CTRL-C>
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.enable_color_detection(is_enabled=True)
await rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.color_detection,
handler=color_detected_handler
)
await rvr.sensor_control.start(interval=250)
while True:
await asyncio.sleep(1)
if __name__ == '__main__':
try:
asyncio.ensure_future(
main()
)
loop.run_forever()
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
asyncio.gather(
rvr.enable_color_detection(is_enabled=False),
rvr.close()
)
)
finally:
if loop.is_running():
loop.close()
Color Sensor (Color Detection) with REST API
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import RestfulAsyncDal
from sphero_sdk import RvrStreamingServices
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=RestfulAsyncDal(
domain='10.211.2.21', # Add your raspberry-pi's IP address here
port=2010 # The port opened by the npm server is always 2010
)
)
async def color_detected_handler(color_detected_data):
print('Color detection data response: ', color_detected_data)
async def main():
""" This program uses the color sensor on RVR (located on the down side of RVR, facing the floor) to report colors detected.
To exit program, press <CTRL-C>
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.enable_color_detection(is_enabled=True)
await rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.color_detection,
handler=color_detected_handler
)
await rvr.sensor_control.start(interval=250)
while True:
await asyncio.sleep(1)
if __name__ == '__main__':
try:
asyncio.ensure_future(
main()
)
loop.run_forever()
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
asyncio.gather(
rvr.enable_color_detection(is_enabled=False),
rvr.close()
)
)
finally:
if loop.is_running():
loop.close()
Drive - Raw Motors
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import RawMotorModesEnum
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def main():
""" This program has RVR drive around in different directions.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.reset_yaw()
await rvr.raw_motors(
left_mode=RawMotorModesEnum.forward.value,
left_speed=128, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.forward.value,
right_speed=128 # Valid speed values are 0-255
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.raw_motors(
left_mode=RawMotorModesEnum.reverse.value,
left_speed=64, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.reverse.value,
right_speed=64 # Valid speed values are 0-255
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.raw_motors(
left_mode=RawMotorModesEnum.reverse.value,
left_speed=128, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.forward.value,
right_speed=128 # Valid speed values are 0-255
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.raw_motors(
left_mode=RawMotorModesEnum.forward.value,
left_speed=128, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.forward.value,
right_speed=128 # Valid speed values are 0-255
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.raw_motors(
left_mode=RawMotorModesEnum.off.value,
left_speed=0, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.off.value,
right_speed=0 # Valid speed values are 0-255
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Drive with Heading
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import DriveFlagsBitmask
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def main():
""" This program has RVR drive around in different directions using the function drive_with_heading.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.reset_yaw()
await rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=0, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=0, # Valid heading values are 0-359
flags=DriveFlagsBitmask.drive_reverse.value
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=90, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=270, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.drive_with_heading(
speed=0, # Valid heading values are 0-359
heading=0, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Drive with Heading - Reverse Mode
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import DriveFlagsBitmask
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def main():
""" This program has RVR drive around in different directions using the function drive_with_heading.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.reset_yaw()
await rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=90, # Valid heading values are 0-359
flags=DriveFlagsBitmask.drive_reverse.value
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.drive_with_heading(
speed=0, # Valid heading values are 0-359
heading=0, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Drive with Helper
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def main():
""" This program has RVR drive with how to drive RVR using the drive control helper.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.drive_control.reset_heading()
await rvr.drive_control.drive_forward_seconds(
speed=64,
heading=0, # Valid heading values are 0-359
time_to_drive=1
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.drive_control.drive_backward_seconds(
speed=64,
heading=0, # Valid heading values are 0-359
time_to_drive=1
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.drive_control.turn_left_degrees(
heading=0, # Valid heading values are 0-359
amount=90
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Drive with Helper - Roll
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def main():
""" This program has RVR drive with how to drive RVR using the drive control helper.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.drive_control.reset_heading()
await rvr.drive_control.roll_start(
speed=64,
heading=90
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.drive_control.roll_stop(heading=270)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Drive with REST API
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import RestfulAsyncDal
from sphero_sdk import DriveFlagsBitmask
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=RestfulAsyncDal(
domain='10.211.2.21', # Add your raspberry-pi's IP address here
port=2010 # The port opened by the npm server is always 2010
)
)
async def main():
""" This program has RVR drive using the Rest API. In order to test it, a node.js server must be running on the
raspberry-pi connected to RVR. This code is meant to be executed from a separate computer.
Note:
To give RVR time to drive, we call asyncio.sleep(...); if we did not have these calls, the program would
go on and execute all the statements and exit without the driving ever taking place.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.reset_yaw()
await rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=0, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=0, # Valid heading values are 0-359
flags=DriveFlagsBitmask.drive_reverse.value
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=90, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=270, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.drive_with_heading(
speed=0, # Valid heading values are 0-359
heading=0, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Infrared - Broadcast IR
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import InfraredCodes
from sphero_sdk import RawMotorModesEnum
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def main():
""" This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.start_robot_to_robot_infrared_broadcasting(
far_code=InfraredCodes.one.value,
near_code=InfraredCodes.zero.value
)
for i in range(2):
await rvr.raw_motors(
left_mode=RawMotorModesEnum.forward.value,
left_speed=64, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.forward.value,
right_speed=64 # Valid speed values are 0-255
)
# Delay to allow RVR to drive
await asyncio.sleep(2)
await rvr.stop_robot_to_robot_infrared_broadcasting()
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
asyncio.gather(
rvr.stop_robot_to_robot_infrared_broadcasting(),
rvr.close()
)
)
finally:
if loop.is_running():
loop.close()
Infrared - Broadcast IR with Helper
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import InfraredCodes
from sphero_sdk import RawMotorModesEnum
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def main():
""" This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.infrared_control.start_infrared_broadcasting(
far_code=InfraredCodes.one,
near_code=InfraredCodes.zero
)
await rvr.raw_motors(
left_mode=RawMotorModesEnum.forward.value,
left_speed=64, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.forward.value,
right_speed=64 # Valid speed values are 0-255
)
# Delay to allow RVR to drive
await asyncio.sleep(4)
await rvr.infrared_control.stop_infrared_broadcasting()
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
asyncio.gather(
rvr.stop_robot_to_robot_infrared_broadcasting(),
rvr.close()
)
)
finally:
if loop.is_running():
loop.close()
Infrared - Listen For and Send IR
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def infrared_message_received_handler(infrared_message):
print('Infrared message response: ', infrared_message)
async def main():
""" This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.on_robot_to_robot_infrared_message_received_notify(handler=infrared_message_received_handler)
await rvr.enable_robot_infrared_message_notify(is_enabled=True)
infrared_code = 3
strength = 64
while True:
await rvr.send_infrared_message(
infrared_code=infrared_code,
front_strength=strength,
left_strength=strength,
right_strength=strength,
rear_strength=strength
)
print('Infrared message sent with code: {0}'.format(infrared_code))
await asyncio.sleep(2)
if __name__ == '__main__':
try:
asyncio.ensure_future(
main()
)
loop.run_forever()
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
asyncio.gather(
rvr.stop_robot_to_robot_infrared_broadcasting(),
rvr.close()
)
)
finally:
if loop.is_running():
loop.close()
Infrared - Listen For and Send IR with Helper
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import InfraredCodes
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def infrared_message_received_handler(infrared_message):
print('Infrared message response: ', infrared_message)
async def main():
""" This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.infrared_control.listen_for_infrared_message(handler=infrared_message_received_handler)
codes = [
InfraredCodes.zero,
InfraredCodes.one,
InfraredCodes.two,
InfraredCodes.three
]
while True:
await rvr.infrared_control.send_infrared_messages(
messages=codes,
strength=64
)
print('Infrared message sent with codes: {0}'.format([code.value for code in codes]))
await asyncio.sleep(2)
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
asyncio.gather(
rvr.stop_robot_to_robot_infrared_broadcasting(),
rvr.close()
)
)
finally:
if loop.is_running():
loop.close()
Infrared - Listen For and Send IR with REST API
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import RestfulAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=RestfulAsyncDal(
domain='10.211.2.21', # Add your raspberry-pi's IP address here
port=2010 # The port opened by the npm server is always 2010
)
)
async def infrared_message_received_handler(response):
print('Response data for IR message received:',response)
async def main():
""" This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
To try this out, write a script for your other robot that a) broadcasts on the corresponding
channel that RVR is set to listen to [in this case channel 0] and b) listens on the channel which
RVR sends messages on [in this case channel 3]
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.on_robot_to_robot_infrared_message_received_notify(handler=infrared_message_received_handler)
await rvr.enable_robot_infrared_message_notify(is_enabled=True)
infrared_code = 3
strength = 64
while True:
await rvr.send_infrared_message(
infrared_code=infrared_code,
front_strength=strength,
left_strength=strength,
right_strength=strength,
rear_strength=strength
)
print('Infrared message sent with code: {0}'.format(infrared_code))
await asyncio.sleep(2)
if __name__ == '__main__':
try:
asyncio.ensure_future(
main()
)
loop.run_forever()
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
asyncio.gather(
rvr.stop_robot_to_robot_infrared_broadcasting(),
rvr.close()
)
)
finally:
if loop.is_running():
loop.close()
LEDs - Set All LEDs
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups
from sphero_sdk import SerialAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def main():
""" This program demonstrates how to set the all the LEDs.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.set_all_leds(
led_group=RvrLedGroups.all_lights.value,
led_brightness_values=[color for _ in range(10) for color in Colors.off.value]
)
# Delay to show LEDs change
await asyncio.sleep(1)
await rvr.set_all_leds(
led_group=RvrLedGroups.all_lights.value,
led_brightness_values=[color for x in range(10) for color in [0, 255, 0]]
)
# Delay to show LEDs change
await asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
LEDs - Set All LEDs with Helper
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import Colors
from sphero_sdk import SerialAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def main():
""" This program demonstrates how to set the all the LEDs of RVR using the LED control helper.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.led_control.turn_leds_off()
# Delay to show LEDs change
await asyncio.sleep(1)
await rvr.led_control.set_all_leds_color(color=Colors.yellow)
# Delay to show LEDs change
await asyncio.sleep(1)
await rvr.led_control.turn_leds_off()
# Delay to show LEDs change
await asyncio.sleep(1)
await rvr.led_control.set_all_leds_rgb(red=255, green=144, blue=0)
# Delay to show LEDs change
await asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
LEDs - Set Multiple LEDs
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups
from sphero_sdk import SerialAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def main():
""" This program demonstrates how to set multiple LEDs.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.set_all_leds(
led_group=RvrLedGroups.all_lights.value,
led_brightness_values=[color for _ in range(10) for color in Colors.off.value]
)
# Delay to show LEDs change
await asyncio.sleep(1)
await rvr.set_all_leds(
led_group=RvrLedGroups.headlight_left.value | RvrLedGroups.headlight_right.value,
led_brightness_values=[
0, 0, 255,
255, 0, 0
]
)
# Delay to show LEDs change
await asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
LEDs - Set Multiple LEDs with Helper
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups
from sphero_sdk import SerialAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def main():
""" This program demonstrates how to set multiple LEDs on RVR using the LED control helper.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.led_control.turn_leds_off()
# Delay to show LEDs change
await asyncio.sleep(1)
await rvr.led_control.set_multiple_leds_with_enums(
leds=[
RvrLedGroups.headlight_left,
RvrLedGroups.headlight_right
],
colors=[
Colors.green,
Colors.blue
]
)
# Delay to show LEDs change
await asyncio.sleep(1)
await rvr.led_control.set_multiple_leds_with_rgb(
leds=[
RvrLedGroups.headlight_left,
RvrLedGroups.headlight_right
],
colors=[
255, 0, 0,
0, 255, 0
]
)
# Delay to show LEDs change
await asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
LEDs - Set Multiple LEDs with REST API
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups
from sphero_sdk import RestfulAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=RestfulAsyncDal(
domain='10.211.2.21', # Add your raspberry-pi's IP address here
port=2010 # The port opened by the npm server is always 2010
)
)
async def main():
""" This program demonstrates how to set multiple LEDs.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.set_all_leds(
led_group=RvrLedGroups.all_lights.value,
led_brightness_values=[color for _ in range(10) for color in Colors.off.value]
)
# Delay to show LEDs change
await asyncio.sleep(1)
await rvr.set_all_leds(
led_group=RvrLedGroups.headlight_left.value | RvrLedGroups.headlight_right.value,
led_brightness_values=[
0, 0, 255,
255, 0, 0
]
)
# Delay to show LEDs change
await asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
LEDs - Set Single LED
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups
from sphero_sdk import SerialAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def main():
""" This program demonstrates how to set a single LED.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.set_all_leds(
led_group=RvrLedGroups.all_lights.value,
led_brightness_values=[color for _ in range(10) for color in Colors.off.value]
)
# Delay to show LEDs change
await asyncio.sleep(1)
await rvr.set_all_leds(
led_group=RvrLedGroups.headlight_right.value, # 0xe00
led_brightness_values=[255, 0, 0]
)
# Delay to show LEDs change
await asyncio.sleep(1)
await rvr.set_all_leds(
led_group=RvrLedGroups.headlight_left.value, # 0x1c0
led_brightness_values=[0, 255, 0]
)
# Delay to show LEDs change
await asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
LEDs - Set Single LED with Helper
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups
from sphero_sdk import SerialAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def main():
""" This program demonstrates how to set a single LED on RVR using the LED control helper.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.led_control.turn_leds_off()
# Delay to show LEDs change
await asyncio.sleep(1)
await rvr.led_control.set_led_rgb(
led=RvrLedGroups.headlight_right,
red=255,
green=0,
blue=0
)
# Delay to show LEDs change
await asyncio.sleep(1)
await rvr.led_control.set_led_color(
led=RvrLedGroups.headlight_left,
color=Colors.green
)
# Delay to show LEDs change
await asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Power - Get Battery State
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import BatteryVoltageStatesEnum as VoltageStates
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def main():
""" This program demonstrates how to retrieve the battery state of RVR.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
battery_percentage = await rvr.get_battery_percentage()
print('Battery percentage: ', battery_percentage)
battery_voltage_state = await rvr.get_battery_voltage_state()
print('Voltage state: ', battery_voltage_state)
state_info = '[{}, {}, {}, {}]'.format(
'{}: {}'.format(VoltageStates.unknown.name, VoltageStates.unknown.value),
'{}: {}'.format(VoltageStates.ok.name, VoltageStates.ok.value),
'{}: {}'.format(VoltageStates.low.name, VoltageStates.low.value),
'{}: {}'.format(VoltageStates.critical.name, VoltageStates.critical.value)
)
print('Voltage states: ', state_info)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Power - Set Battery State Notifications
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def battery_voltage_state_change_handler(battery_voltage_state):
print('Battery voltage state: ', battery_voltage_state)
async def main():
""" This program demonstrates how to enable battery state change notifications.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.on_battery_voltage_state_change_notify(handler=battery_voltage_state_change_handler)
await rvr.enable_battery_voltage_state_change_notify(is_enabled=True)
print('Waiting for battery notification...')
# The asyncio loop will run forever to give the aforementioned events time to occur
if __name__ == '__main__':
try:
asyncio.ensure_future(
main()
)
loop.run_forever()
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Power - Get Battery State with REST API
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import RestfulAsyncDal
from sphero_sdk import BatteryVoltageStatesEnum as VoltageStates
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=RestfulAsyncDal(
domain='10.211.2.21', # Add your raspberry-pi's IP address here
port=2010 # The port opened by the npm server is always 2010
)
)
async def main():
""" This program demonstrates how to retrieve the battery state of RVR and print it to the console.
Echo can be used to check to see if RVR is connected and awake. In order to test it, a node.js
server must be running on the raspberry-pi connected to RVR. This code is meant to be executed
from a separate computer.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
battery_percentage = await rvr.get_battery_percentage()
print('Battery percentage: ', battery_percentage)
battery_voltage_state = await rvr.get_battery_voltage_state()
print('Voltage state: ', battery_voltage_state)
state_info = '[{}, {}, {}, {}]'.format(
'{}: {}'.format(VoltageStates.unknown.name, VoltageStates.unknown.value),
'{}: {}'.format(VoltageStates.ok.name, VoltageStates.ok.value),
'{}: {}'.format(VoltageStates.low.name, VoltageStates.low.value),
'{}: {}'.format(VoltageStates.critical.name, VoltageStates.critical.value)
)
print('Voltage states: ', state_info)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Power - Sleep Monitoring
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def will_sleep_handler():
print('RVR is about to sleep...')
# here we could issue a command to RVR, e.g. wake() such that the sleep timer is reset
async def did_sleep_handler():
print('RVR is asleep...')
async def main():
""" This program demonstrates how to register handlers for a) the event received 10 seconds
before RVR will sleep unless some new command is issued and b) the event received
when RVR does go to sleep.
Note that these notifications are received without the need to enable them on the robot.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.on_will_sleep_notify(handler=will_sleep_handler)
await rvr.on_did_sleep_notify(handler=did_sleep_handler)
# The asyncio loop will run forever to give the aforementioned events time to occur
if __name__ == '__main__':
try:
asyncio.ensure_future(
main()
)
loop.run_forever()
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Sensor Streaming - Change Stream Settings
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import RvrStreamingServices
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def imu_handler(imu_data):
print('IMU data response: ', imu_data)
async def ambient_light_handler(ambient_light_data):
print('Ambient data response:', ambient_light_data)
async def velocity_handler(velocity_data):
print('Velocity data response:', velocity_data)
async def main():
""" This program demonstrates how to update sensor streaming parameters at runawait asyncio.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
print('----------', 'Enabling IMU at 100 ms', '----------')
await rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.imu,
handler=imu_handler
)
await rvr.sensor_control.start(interval=100)
# Delay to allow RVR to stream sensor data
await asyncio.sleep(5)
print('----------', 'Updating interval to 1000 ms', '----------')
await rvr.sensor_control.stop()
await rvr.sensor_control.start(interval=1000)
# Delay to allow RVR to stream sensor data
await asyncio.sleep(5)
print('----------', 'Adding ambient light and velocity sensor streams', '----------')
await rvr.sensor_control.stop()
await rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.ambient_light,
handler=ambient_light_handler
)
await rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.velocity,
handler=velocity_handler
)
await rvr.sensor_control.start(interval=1000)
# Delay to allow RVR to stream sensor data
await asyncio.sleep(5)
print('----------', 'Disabling IMU sensor stream and updating interval to 100 ms', '----------')
await rvr.sensor_control.stop()
await rvr.sensor_control.remove_sensor_data_handler(service=RvrStreamingServices.imu)
await rvr.sensor_control.start(interval=100)
# Delay to allow RVR to stream sensor data
await asyncio.sleep(5)
print('----------', 'Clearing all services', '----------')
await rvr.sensor_control.clear()
# Delay to allow RVR to stream sensor data
await asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
asyncio.gather(
rvr.sensor_control.clear(),
rvr.close()
)
)
finally:
if loop.is_running():
loop.close()
Sensor Streaming - Change Stream Settings with REST API
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import RestfulAsyncDal
from sphero_sdk import RvrStreamingServices
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=RestfulAsyncDal(
domain='10.211.2.21', # Add your raspberry-pi's IP address here
port=2010
)
)
async def imu_handler(imu_data):
print('IMU data response: ', imu_data)
async def ambient_light_handler(ambient_light_data):
print('Ambient data response:', ambient_light_data)
async def velocity_handler(velocity_data):
print('Velocity data response:', velocity_data)
async def main():
""" This program has RVR drive around in different directions using the function raw_motors.
Note:
To give RVR time to drive, we call asyncio.sleep(...); if we did not have these calls, the program would
go on and execute all the statements and exit without the driving ever taking place.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
print('----------', 'Enabling IMU at 100 ms', '----------')
await rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.imu,
handler=imu_handler
)
await rvr.sensor_control.start(interval=100)
# Delay to allow RVR to stream sensor data
await asyncio.sleep(5)
print('----------', 'Updating interval to 1000 ms', '----------')
await rvr.sensor_control.stop()
await rvr.sensor_control.start(interval=1000)
# Delay to allow RVR to stream sensor data
await asyncio.sleep(5)
print('----------', 'Adding ambient light and velocity sensor streams', '----------')
await rvr.sensor_control.stop()
await rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.ambient_light,
handler=ambient_light_handler
)
await rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.velocity,
handler=velocity_handler
)
await rvr.sensor_control.start(interval=1000)
# Delay to allow RVR to stream sensor data
await asyncio.sleep(5)
print('----------', 'Disabling IMU sensor stream and updating interval to 100 ms', '----------')
await rvr.sensor_control.stop()
await rvr.sensor_control.remove_sensor_data_handler(service=RvrStreamingServices.imu)
await rvr.sensor_control.start(interval=100)
# Delay to allow RVR to stream sensor data
await asyncio.sleep(5)
print('----------', 'Clearing all services', '----------')
await rvr.sensor_control.clear()
# Delay to allow RVR to stream sensor data
await asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
asyncio.gather(
rvr.sensor_control.clear(),
rvr.close()
)
)
finally:
if loop.is_running():
loop.close()
Sensor Streaming - Multi Sensor Stream
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import RvrStreamingServices
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def imu_handler(imu_data):
print('IMU data response: ', imu_data)
async def color_detected_handler(color_detected_data):
print('Color detection data response: ', color_detected_data)
async def accelerometer_handler(accelerometer_data):
print('Accelerometer data response: ', accelerometer_data)
async def ambient_light_handler(ambient_light_data):
print('Ambient light data response: ', ambient_light_data)
async def main():
""" This program demonstrates how to enable multiple sensors to stream.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.imu,
handler=imu_handler
)
await rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.color_detection,
handler=color_detected_handler
)
await rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.accelerometer,
handler=accelerometer_handler
)
await rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.ambient_light,
handler=ambient_light_handler
)
await rvr.sensor_control.start(interval=250)
# The asyncio loop will run forever to allow infinite streaming.
if __name__ == '__main__':
try:
asyncio.ensure_future(
main()
)
loop.run_forever()
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
asyncio.gather(
rvr.sensor_control.clear(),
rvr.close()
)
)
finally:
if loop.is_running():
loop.close()
Sensor Streaming - Single Sensor Stream
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import RvrStreamingServices
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def accelerometer_handler(accelerometer_data):
print('Accelerometer data response: ', accelerometer_data)
async def main():
""" This program demonstrates how to enable a single sensor to stream.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.accelerometer,
handler=accelerometer_handler
)
await rvr.sensor_control.start(interval=250)
# The asyncio loop will run forever to allow infinite streaming.
if __name__ == '__main__':
try:
asyncio.ensure_future(
main()
)
loop.run_forever()
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
asyncio.gather(
rvr.sensor_control.clear(),
rvr.close()
)
)
finally:
if loop.is_running():
loop.close()
System - Get Main App Version
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import SpheroRvrTargets
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def main():
""" This program demonstrates how to obtain the firmware version for a specific processor.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
nordic_main_application_version = await rvr.get_main_application_version(target=SpheroRvrTargets.primary.value)
print('Nordic main application version (target 1): ', nordic_main_application_version)
st_main_application_version = await rvr.get_main_application_version(target=SpheroRvrTargets.secondary.value)
print('ST main application version (target 2): ', st_main_application_version)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
System - Get Main App Version with REST API
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SpheroRvrTargets
from sphero_sdk import RestfulAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=RestfulAsyncDal(
domain='10.211.2.21', # Add your raspberry-pi's IP address here
port=2010 # The port opened by the npm server is always 2010
)
)
async def main():
""" This program demonstrates how to obtain the firmware version for a specific processor.
In order to test it, a node.js server must be running on the raspberry-pi connected to RVR.
This code is meant to be executed from a separate computer.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
nordic_main_application_version = await rvr.get_main_application_version(target=SpheroRvrTargets.primary.value)
print('Nordic main application version (target 1): ', nordic_main_application_version)
st_main_application_version = await rvr.get_main_application_version(target=SpheroRvrTargets.secondary.value)
print('ST main application version (target 2): ', st_main_application_version)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Observer
API and Shell (Pinging)
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import SpheroRvrTargets
rvr = SpheroRvrObserver()
def echo_handler(echo_response):
print('Echo response: ', echo_response)
def main():
""" This program demonstrates how to use the echo command, which sends data to RVR and RVR returns
the same data. Echo can be used to check to see if RVR is connected and awake.
"""
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.echo(
data=[0, 1, 2],
handler=echo_handler,
target=SpheroRvrTargets.primary.value
)
# Give RVR time to respond
time.sleep(1)
rvr.echo(
data=[3, 4, 5],
handler=echo_handler,
target=SpheroRvrTargets.secondary.value
)
# Sleep for one second such that RVR has time to send data back
time.sleep(1)
rvr.close()
if __name__ == '__main__':
main()
Color Sensor (Color Detection)
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import RvrStreamingServices
rvr = SpheroRvrObserver()
def color_detected_handler(color_detected_data):
print('Color detection data response: ', color_detected_data)
def main():
""" This program demonstrates how to use the color sensor on RVR (located on the down side of RVR, facing the floor)
to report colors detected.
"""
try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.enable_color_detection(is_enabled=True)
rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.color_detection,
handler=color_detected_handler
)
rvr.sensor_control.start(interval=250)
# Allow this program to run for 10 seconds
time.sleep(10)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.sensor_control.clear()
# Delay to allow RVR issue command before closing
time.sleep(.5)
rvr.close()
if __name__ == '__main__':
main()
Drive - Raw Motors
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import RawMotorModesEnum
rvr = SpheroRvrObserver()
def main():
""" This program has RVR drive around in different directions.
"""
try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.reset_yaw()
rvr.raw_motors(
left_mode=RawMotorModesEnum.forward.value,
left_speed=128, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.forward.value,
right_speed=128 # Valid speed values are 0-255
)
# Delay to allow RVR to drive
time.sleep(1)
rvr.raw_motors(
left_mode=RawMotorModesEnum.reverse.value,
left_speed=64, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.reverse.value,
right_speed=64 # Valid speed values are 0-255
)
# Delay to allow RVR to drive
time.sleep(1)
rvr.raw_motors(
left_mode=RawMotorModesEnum.reverse.value,
left_speed=128, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.forward.value,
right_speed=128 # Valid speed values are 0-255
)
# Delay to allow RVR to drive
time.sleep(1)
rvr.raw_motors(
left_mode=RawMotorModesEnum.forward.value,
left_speed=128, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.forward.value,
right_speed=128 # Valid speed values are 0-255
)
# Delay to allow RVR to drive
time.sleep(1)
rvr.raw_motors(
left_mode=RawMotorModesEnum.off.value,
left_speed=0, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.off.value,
right_speed=0 # Valid speed values are 0-255
)
# Delay to allow RVR to drive
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.close()
if __name__ == '__main__':
main()
Drive with Heading
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import DriveFlagsBitmask
rvr = SpheroRvrObserver()
def main():
""" This program has RVR drive around in different directions using the function drive_with_heading.
"""
try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.reset_yaw()
rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=0, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to drive
time.sleep(1)
rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=0, # Valid heading values are 0-359
flags=DriveFlagsBitmask.drive_reverse.value
)
# Delay to allow RVR to drive
time.sleep(1)
rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=90, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to drive
time.sleep(1)
rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=270, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to drive
time.sleep(1)
rvr.drive_with_heading(
speed=0, # Valid heading values are 0-359
heading=0, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to drive
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.close()
if __name__ == '__main__':
main()
Drive with Heading - Reverse Mode
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import DriveFlagsBitmask
rvr = SpheroRvrObserver()
def main():
""" This program has RVR drive around in different directions using the function drive_with_heading.
"""
try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.reset_yaw()
rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=90, # Valid heading values are 0-359
flags=DriveFlagsBitmask.drive_reverse.value
)
# Delay to allow RVR to drive
time.sleep(1)
rvr.drive_with_heading(
speed=0, # Valid heading values are 0-359
heading=0, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to drive
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.close()
if __name__ == '__main__':
main()
Drive with Helper
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
rvr = SpheroRvrObserver()
def main():
""" This program has RVR drive with how to drive RVR using the drive control helper.
"""
try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.drive_control.reset_heading()
rvr.drive_control.drive_forward_seconds(
speed=64,
heading=0, # Valid heading values are 0-359
time_to_drive=1
)
# Delay to allow RVR to drive
time.sleep(1)
rvr.drive_control.drive_backward_seconds(
speed=64,
heading=0, # Valid heading values are 0-359
time_to_drive=1
)
# Delay to allow RVR to drive
time.sleep(1)
rvr.drive_control.turn_left_degrees(
heading=0, # Valid heading values are 0-359
amount=90
)
# Delay to allow RVR to drive
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.close()
if __name__ == '__main__':
main()
Drive with Helper - Roll
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
rvr = SpheroRvrObserver()
def main():
""" This program has RVR drive with how to drive RVR using the drive control helper.
"""
try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.drive_control.reset_heading()
rvr.drive_control.roll_start(
speed=64,
heading=90
)
# Delay to allow RVR to drive
time.sleep(1)
rvr.drive_control.roll_stop(heading=270)
# Delay to allow RVR to drive
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.close()
if __name__ == '__main__':
main()
Infrared - Broadcast IR
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import InfraredCodes
from sphero_sdk import RawMotorModesEnum
rvr = SpheroRvrObserver()
def main():
""" This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
"""
try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.start_robot_to_robot_infrared_broadcasting(
far_code=InfraredCodes.one.value,
near_code=InfraredCodes.zero.value
)
for i in range(2):
rvr.raw_motors(
left_mode=RawMotorModesEnum.forward.value,
left_speed=64, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.forward.value,
right_speed=64 # Valid speed values are 0-255
)
# Delay to allow RVR to drive
time.sleep(2)
except:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.stop_robot_to_robot_infrared_broadcasting()
# Delay to allow RVR issue command before closing
time.sleep(.5)
rvr.close()
if __name__ == '__main__':
main()
Infrared - Broadcast IR with Helper
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import InfraredCodes
from sphero_sdk import RawMotorModesEnum
rvr = SpheroRvrObserver()
def main():
""" This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
"""
try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.infrared_control.start_robot_to_robot_infrared_broadcasting(
far_code=InfraredCodes.one.value,
near_code=InfraredCodes.zero.value
)
rvr.raw_motors(
left_mode=RawMotorModesEnum.forward.value,
left_speed=64, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.forward.value,
right_speed=64 # Valid speed values are 0-255
)
# Delay to allow RVR to drive
time.sleep(4)
except:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.stop_robot_to_robot_infrared_broadcasting()
# Delay to allow RVR issue command before closing
time.sleep(.5)
rvr.close()
if __name__ == '__main__':
main()
Infrared - Listen For and Send IR
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
rvr = SpheroRvrObserver()
def infrared_message_received_handler(infrared_message):
print('Infrared message response: ', infrared_message)
def main():
""" This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
"""
try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.on_robot_to_robot_infrared_message_received_notify(handler=infrared_message_received_handler)
rvr.enable_robot_infrared_message_notify(is_enabled=True)
infrared_code = 3
strength = 64
for _ in range(20):
rvr.send_infrared_message(
infrared_code=infrared_code,
front_strength=strength,
left_strength=strength,
right_strength=strength,
rear_strength=strength
)
print('Infrared message sent with code: {0}'.format(infrared_code))
time.sleep(2)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.stop_robot_to_robot_infrared_broadcasting()
# Delay to allow RVR issue command before closing
time.sleep(.5)
rvr.close()
if __name__ == '__main__':
main()
Infrared - Listen For and Send IR with Helper
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import InfraredCodes
rvr = SpheroRvrObserver()
def infrared_message_received_handler(infrared_message):
print('Infrared message response: ', infrared_message)
def main():
""" This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
"""
try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.infrared_control.listen_for_infrared_message(handler=infrared_message_received_handler)
codes = [
InfraredCodes.zero,
InfraredCodes.one,
InfraredCodes.two,
InfraredCodes.three
]
for _ in range(20):
rvr.infrared_control.send_infrared_messages(
messages=codes,
strength=64
)
print('Infrared message sent with codes: {0}'.format([code.value for code in codes]))
time.sleep(2)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.stop_robot_to_robot_infrared_broadcasting()
# Delay to allow RVR issue command before closing
time.sleep(.5)
rvr.close()
if __name__ == '__main__':
main()
LEDs - Set All LEDs
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups
rvr = SpheroRvrObserver()
def main():
""" This program demonstrates how to set the all the LEDs.
"""
try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.set_all_leds(
led_group=RvrLedGroups.all_lights.value,
led_brightness_values=[color for _ in range(10) for color in Colors.off.value]
)
# Delay to show LEDs change
time.sleep(1)
rvr.set_all_leds(
led_group=RvrLedGroups.all_lights.value,
led_brightness_values=[color for _ in range(10) for color in [255, 0, 0]]
)
# Delay to show LEDs change
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.close()
if __name__ == '__main__':
main()
LEDs - Set All LEDs with Helper
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import Colors
rvr = SpheroRvrObserver()
def main():
""" This program demonstrates how to set the all the LEDs of RVR using the LED control helper.
"""
try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.led_control.turn_leds_off()
# Delay to show LEDs change
time.sleep(1)
rvr.led_control.set_all_leds_color(color=Colors.yellow)
# Delay to show LEDs change
time.sleep(1)
rvr.led_control.turn_leds_off()
# Delay to show LEDs change
time.sleep(1)
rvr.led_control.set_all_leds_rgb(red=255, green=144, blue=0)
# Delay to show LEDs change
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.close()
if __name__ == '__main__':
main()
LEDs - Set Multiple LEDs
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups
rvr = SpheroRvrObserver()
def main():
""" This program demonstrates how to set multiple LEDs.
"""
try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.set_all_leds(
led_group=RvrLedGroups.all_lights.value,
led_brightness_values=[color for _ in range(10) for color in Colors.off.value]
)
# Delay to show LEDs change
time.sleep(1)
rvr.set_all_leds(
led_group=RvrLedGroups.headlight_left.value | RvrLedGroups.headlight_right.value,
led_brightness_values=[
255, 0, 0,
0, 0, 255
]
)
# Delay to show LEDs change
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.close()
if __name__ == '__main__':
main()
LEDs - Set Multiple LEDs with Helper
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups
rvr = SpheroRvrObserver()
def main():
""" This program demonstrates how to set multiple LEDs on RVR using the LED control helper.
"""
try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.led_control.turn_leds_off()
# Delay to show LEDs change
time.sleep(1)
rvr.led_control.set_multiple_leds_with_enums(
leds=[
RvrLedGroups.headlight_left,
RvrLedGroups.headlight_right
],
colors=[
Colors.green,
Colors.blue
]
)
# Delay to show LEDs change
time.sleep(1)
rvr.led_control.set_multiple_leds_with_rgb(
leds=[
RvrLedGroups.headlight_left,
RvrLedGroups.headlight_right
],
colors=[
255, 0, 0,
0, 255, 0
]
)
# Delay to show LEDs change
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.close()
if __name__ == '__main__':
main()
LEDs - Set Single LED
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups
rvr = SpheroRvrObserver()
def main():
""" This program demonstrates how to set a single LED.
"""
try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.set_all_leds(
led_group=RvrLedGroups.all_lights.value,
led_brightness_values=[color for _ in range(10) for color in Colors.off.value]
)
# Delay to show LEDs change
time.sleep(1)
rvr.set_all_leds(
led_group=RvrLedGroups.headlight_right.value, # 0xe00
led_brightness_values=[255, 0, 0]
)
# Delay to show LEDs change
time.sleep(1)
rvr.set_all_leds(
led_group=RvrLedGroups.headlight_left.value, # 0x1c0
led_brightness_values=[0, 255, 0]
)
# Delay to show LEDs change
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.close()
if __name__ == '__main__':
main()
LEDs - Set Single LED with Helper
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups
rvr = SpheroRvrObserver()
def main():
""" This program demonstrates how to set a single LED on RVR using the LED control helper.
"""
try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.led_control.turn_leds_off()
# Delay to show LEDs change
time.sleep(1)
rvr.led_control.set_led_rgb(
led=RvrLedGroups.headlight_right,
red=255,
green=0,
blue=0
)
# Delay to show LEDs change
time.sleep(1)
rvr.led_control.set_led_color(
led=RvrLedGroups.headlight_left,
color=Colors.green
)
# Delay to show LEDs change
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.close()
if __name__ == '__main__':
main()
Power - Get Battery State
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import BatteryVoltageStatesEnum as VoltageStates
rvr = SpheroRvrObserver()
def battery_percentage_handler(battery_percentage):
print('Battery percentage: ', battery_percentage)
def battery_voltage_handler(battery_voltage_state):
print('Voltage state: ', battery_voltage_state)
state_info = '[{}, {}, {}, {}]'.format(
'{}: {}'.format(VoltageStates.unknown.name, VoltageStates.unknown.value),
'{}: {}'.format(VoltageStates.ok.name, VoltageStates.ok.value),
'{}: {}'.format(VoltageStates.low.name, VoltageStates.low.value),
'{}: {}'.format(VoltageStates.critical.name, VoltageStates.critical.value)
)
print('Voltage states: ', state_info)
def main():
""" This program demonstrates how to retrieve the battery state of RVR.
"""
try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.get_battery_percentage(handler=battery_percentage_handler)
# Sleep for one second such that RVR has time to send data back
time.sleep(1)
rvr.get_battery_voltage_state(handler=battery_voltage_handler)
# Sleep for one second such that RVR has time to send data back
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.close()
if __name__ == '__main__':
main()
Power - Get Battery State Notifications
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
rvr = SpheroRvrObserver()
def battery_voltage_state_change_handler(battery_voltage_state):
print('Battery voltage state: ', battery_voltage_state)
def main():
""" This program demonstrates how to enable battery state change notifications.
"""
try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.on_battery_voltage_state_change_notify(handler=battery_voltage_state_change_handler)
rvr.enable_battery_voltage_state_change_notify(is_enabled=True)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.close()
if __name__ == '__main__':
main()
Power - Sleep Monitoring
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
rvr = SpheroRvrObserver()
def will_sleep_handler():
print('RVR is about to sleep...')
# here we could issue a command to RVR, e.g. wake() such that the sleep timer is reset
def did_sleep_handler():
print('RVR is asleep...')
def main():
""" This program demonstrates how to register handlers for a) the event received 10 seconds
before RVR will sleep unless some new command is issued and b) the event received
when RVR does go to sleep.
Note that these notifications are received without the need to enable them on the robot.
"""
try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.on_will_sleep_notify(will_sleep_handler)
rvr.on_did_sleep_notify(did_sleep_handler)
# Sleep for 310 seconds such that we see the aforementioned events have time to occur
time.sleep(310)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.close()
if __name__ == '__main__':
main()
Sensor Streaming - Change Stream Settings
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import RvrStreamingServices
rvr = SpheroRvrObserver()
def imu_handler(imu_data):
print('IMU data response: ', imu_data)
def ambient_light_handler(ambient_light_data):
print('Ambient data response:', ambient_light_data)
def velocity_handler(velocity_data):
print('Velocity data response:', velocity_data)
def main():
""" This program demonstrates how to update sensor streaming parameters at runtime.
"""
try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
print('----------', 'Enabling IMU at 100 ms', '----------')
rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.imu,
handler=imu_handler
)
rvr.sensor_control.start(interval=100)
# Delay to allow RVR to stream sensor data
time.sleep(5)
print('----------', 'Updating interval to 1000 ms', '----------')
rvr.sensor_control.stop()
rvr.sensor_control.start(interval=1000)
# Delay to allow RVR to stream sensor data
time.sleep(5)
print('----------', 'Adding ambient light and velocity sensor streams', '----------')
rvr.sensor_control.stop()
rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.ambient_light,
handler=ambient_light_handler
)
rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.velocity,
handler=velocity_handler
)
rvr.sensor_control.start(interval=1000)
# Delay to allow RVR to stream sensor data
time.sleep(5)
print('----------', 'Disabling IMU sensor stream and updating interval to 100 ms', '----------')
rvr.sensor_control.stop()
rvr.sensor_control.remove_sensor_data_handler(service=RvrStreamingServices.imu)
rvr.sensor_control.start(interval=100)
# Delay to allow RVR to stream sensor data
time.sleep(5)
print('----------', 'Clearing all services', '----------')
rvr.sensor_control.clear()
# Delay to allow RVR to stream sensor data
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
if len(rvr.sensor_control.enabled_sensors) > 0:
rvr.sensor_control.clear()
# Delay to allow RVR issue command before closing
time.sleep(.5)
rvr.close()
if __name__ == '__main__':
main()
Sensor Streaming - Multi Sensor Stream
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import RvrStreamingServices
rvr = SpheroRvrObserver()
def imu_handler(imu_data):
print('IMU data response: ', imu_data)
def color_detected_handler(color_detected_data):
print('Color detection data response: ', color_detected_data)
def accelerometer_handler(accelerometer_data):
print('Accelerometer data response: ', accelerometer_data)
def ambient_light_handler(ambient_light_data):
print('Ambient light data response: ', ambient_light_data)
def main():
""" This program demonstrates how to enable multiple sensors to stream.
"""
try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.imu,
handler=imu_handler
)
rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.color_detection,
handler=color_detected_handler
)
rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.accelerometer,
handler=accelerometer_handler
)
rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.ambient_light,
handler=ambient_light_handler
)
rvr.sensor_control.start(interval=250)
while True:
# Delay to allow RVR to stream sensor data
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.sensor_control.clear()
# Delay to allow RVR issue command before closing
time.sleep(.5)
rvr.close()
if __name__ == '__main__':
main()
Sensor Streaming - Single Sensor Stream
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import RvrStreamingServices
rvr = SpheroRvrObserver()
def accelerometer_handler(accelerometer_data):
print('Accelerometer data response: ', accelerometer_data)
def main():
""" This program demonstrates how to enable a single sensor to stream.
"""
try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.accelerometer,
handler=accelerometer_handler
)
rvr.sensor_control.start(interval=250)
while True:
# Delay to allow RVR to stream sensor data
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.sensor_control.clear()
# Delay to allow RVR issue command before closing
time.sleep(.5)
rvr.close()
if __name__ == '__main__':
main()
System (Get Main App Version)
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import SpheroRvrTargets
rvr = SpheroRvrObserver()
def get_nordic_main_application_version_handler(nordic_main_application_version):
print('Nordic main application version (target 1): ', nordic_main_application_version)
def get_st_main_application_version_handler(st_main_application_version):
print('ST main application version (target 2): ', st_main_application_version)
def main():
""" This program demonstrates how to obtain the firmware version for a specific processor.
"""
try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.get_main_application_version(
handler=get_nordic_main_application_version_handler,
target=SpheroRvrTargets.primary.value
)
# Sleep for one second such that RVR has time to send data back
time.sleep(1)
rvr.get_main_application_version(
handler=get_st_main_application_version_handler,
target=SpheroRvrTargets.secondary.value
)
# Sleep for one second such that RVR has time to send data back
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.close()
if __name__ == '__main__':
main()
Deprecated Options
Node.js SDK for Raspberry Pi
Where to get the code:
Head over to the GitHub repo to grab the newest Node.js SDK!
Client.js SDK for Raspberry Pi
Where to get the code:
This GitHub repo contains the Client.js SDK!