import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import SpheroRvrTargets
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
asyncdefmain():""" This program demonstrates how to use the echo command, which sends data to RVR and RVR returns
the same data. Echo can be used to check to see if RVR is connected and awake.
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
echo_response = await rvr.echo(
data=[0, 1, 2],
target=SpheroRvrTargets.primary.value
)
print('Echo response 1: ', echo_response)
echo_response = await rvr.echo(
data=[4, 5, 6],
target=SpheroRvrTargets.secondary.value
)
print('Echo response 2: ', echo_response)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
API and Shell (Pinging) with REST API
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import RestfulAsyncDal
from sphero_sdk import SpheroRvrTargets
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=RestfulAsyncDal(
domain='10.211.2.21', # Add your raspberry-pi's IP address here
port=2010# The port opened by the npm server is always 2010
)
)
asyncdefmain():""" This program demonstrates how to use the echo command, which sends data to RVR and has RVR returns
the same data. Echo can be used to check to see if RVR is connected and awake. In order to test it,
a node.js server must be running on the raspberry-pi connected to RVR. This code is meant to be
executed from a separate computer.
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
echo_response = await rvr.echo(
data=[0, 1, 2],
target=SpheroRvrTargets.primary.value
)
print('Echo response 1: ', echo_response)
echo_response = await rvr.echo(
data=[4, 5, 6],
target=SpheroRvrTargets.secondary.value
)
print('Echo response 2: ', echo_response)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Color Sensor (Color Detection)
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import RvrStreamingServices
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
asyncdefcolor_detected_handler(color_detected_data):
print('Color detection data response: ', color_detected_data)
asyncdefmain():""" This program demonstrates how to use the color sensor on RVR (located on the down side of RVR, facing the floor)
to report colors detected. To exit program, press <CTRL-C>
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
await rvr.enable_color_detection(is_enabled=True)
await rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.color_detection,
handler=color_detected_handler
)
await rvr.sensor_control.start(interval=250)
whileTrue:
await asyncio.sleep(1)
if __name__ == '__main__':
try:
asyncio.ensure_future(
main()
)
loop.run_forever()
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
asyncio.gather(
rvr.enable_color_detection(is_enabled=False),
rvr.close()
)
)
finally:
if loop.is_running():
loop.close()
Color Sensor (Color Detection) with REST API
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import RestfulAsyncDal
from sphero_sdk import RvrStreamingServices
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=RestfulAsyncDal(
domain='10.211.2.21', # Add your raspberry-pi's IP address here
port=2010# The port opened by the npm server is always 2010
)
)
asyncdefcolor_detected_handler(color_detected_data):
print('Color detection data response: ', color_detected_data)
asyncdefmain():""" This program uses the color sensor on RVR (located on the down side of RVR, facing the floor) to report colors detected.
To exit program, press <CTRL-C>
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
await rvr.enable_color_detection(is_enabled=True)
await rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.color_detection,
handler=color_detected_handler
)
await rvr.sensor_control.start(interval=250)
whileTrue:
await asyncio.sleep(1)
if __name__ == '__main__':
try:
asyncio.ensure_future(
main()
)
loop.run_forever()
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
asyncio.gather(
rvr.enable_color_detection(is_enabled=False),
rvr.close()
)
)
finally:
if loop.is_running():
loop.close()
Drive - Raw Motors
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import RawMotorModesEnum
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
asyncdefmain():""" This program has RVR drive around in different directions.
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
await rvr.reset_yaw()
await rvr.raw_motors(
left_mode=RawMotorModesEnum.forward.value,
left_speed=128, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.forward.value,
right_speed=128# Valid speed values are 0-255
)
# Delay to allow RVR to driveawait asyncio.sleep(1)
await rvr.raw_motors(
left_mode=RawMotorModesEnum.reverse.value,
left_speed=64, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.reverse.value,
right_speed=64# Valid speed values are 0-255
)
# Delay to allow RVR to driveawait asyncio.sleep(1)
await rvr.raw_motors(
left_mode=RawMotorModesEnum.reverse.value,
left_speed=128, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.forward.value,
right_speed=128# Valid speed values are 0-255
)
# Delay to allow RVR to driveawait asyncio.sleep(1)
await rvr.raw_motors(
left_mode=RawMotorModesEnum.forward.value,
left_speed=128, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.forward.value,
right_speed=128# Valid speed values are 0-255
)
# Delay to allow RVR to driveawait asyncio.sleep(1)
await rvr.raw_motors(
left_mode=RawMotorModesEnum.off.value,
left_speed=0, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.off.value,
right_speed=0# Valid speed values are 0-255
)
# Delay to allow RVR to driveawait asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Drive with Heading
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import DriveFlagsBitmask
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
asyncdefmain():""" This program has RVR drive around in different directions using the function drive_with_heading.
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
await rvr.reset_yaw()
await rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=0, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to driveawait asyncio.sleep(1)
await rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=0, # Valid heading values are 0-359
flags=DriveFlagsBitmask.drive_reverse.value
)
# Delay to allow RVR to driveawait asyncio.sleep(1)
await rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=90, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to driveawait asyncio.sleep(1)
await rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=270, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to driveawait asyncio.sleep(1)
await rvr.drive_with_heading(
speed=0, # Valid heading values are 0-359
heading=0, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to driveawait asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Drive with Heading - Reverse Mode
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import DriveFlagsBitmask
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
asyncdefmain():""" This program has RVR drive around in different directions using the function drive_with_heading.
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
await rvr.reset_yaw()
await rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=90, # Valid heading values are 0-359
flags=DriveFlagsBitmask.drive_reverse.value
)
# Delay to allow RVR to driveawait asyncio.sleep(1)
await rvr.drive_with_heading(
speed=0, # Valid heading values are 0-359
heading=0, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to driveawait asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Drive with Helper
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
asyncdefmain():""" This program has RVR drive with how to drive RVR using the drive control helper.
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
await rvr.drive_control.reset_heading()
await rvr.drive_control.drive_forward_seconds(
speed=64,
heading=0, # Valid heading values are 0-359
time_to_drive=1
)
# Delay to allow RVR to driveawait asyncio.sleep(1)
await rvr.drive_control.drive_backward_seconds(
speed=64,
heading=0, # Valid heading values are 0-359
time_to_drive=1
)
# Delay to allow RVR to driveawait asyncio.sleep(1)
await rvr.drive_control.turn_left_degrees(
heading=0, # Valid heading values are 0-359
amount=90
)
# Delay to allow RVR to driveawait asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Drive with Helper - Roll
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
asyncdefmain():""" This program has RVR drive with how to drive RVR using the drive control helper.
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
await rvr.drive_control.reset_heading()
await rvr.drive_control.roll_start(
speed=64,
heading=90
)
# Delay to allow RVR to driveawait asyncio.sleep(1)
await rvr.drive_control.roll_stop(heading=270)
# Delay to allow RVR to driveawait asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Drive with REST API
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import RestfulAsyncDal
from sphero_sdk import DriveFlagsBitmask
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=RestfulAsyncDal(
domain='10.211.2.21', # Add your raspberry-pi's IP address here
port=2010# The port opened by the npm server is always 2010
)
)
asyncdefmain():""" This program has RVR drive using the Rest API. In order to test it, a node.js server must be running on the
raspberry-pi connected to RVR. This code is meant to be executed from a separate computer.
Note:
To give RVR time to drive, we call asyncio.sleep(...); if we did not have these calls, the program would
go on and execute all the statements and exit without the driving ever taking place.
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
await rvr.reset_yaw()
await rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=0, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to driveawait asyncio.sleep(1)
await rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=0, # Valid heading values are 0-359
flags=DriveFlagsBitmask.drive_reverse.value
)
# Delay to allow RVR to driveawait asyncio.sleep(1)
await rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=90, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to driveawait asyncio.sleep(1)
await rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=270, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to driveawait asyncio.sleep(1)
await rvr.drive_with_heading(
speed=0, # Valid heading values are 0-359
heading=0, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to driveawait asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Infrared - Broadcast IR
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import InfraredCodes
from sphero_sdk import RawMotorModesEnum
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
asyncdefmain():""" This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
await rvr.start_robot_to_robot_infrared_broadcasting(
far_code=InfraredCodes.one.value,
near_code=InfraredCodes.zero.value
)
for i in range(2):
await rvr.raw_motors(
left_mode=RawMotorModesEnum.forward.value,
left_speed=64, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.forward.value,
right_speed=64# Valid speed values are 0-255
)
# Delay to allow RVR to driveawait asyncio.sleep(2)
await rvr.stop_robot_to_robot_infrared_broadcasting()
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
asyncio.gather(
rvr.stop_robot_to_robot_infrared_broadcasting(),
rvr.close()
)
)
finally:
if loop.is_running():
loop.close()
Infrared - Bradcast IR with Helper
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import InfraredCodes
from sphero_sdk import RawMotorModesEnum
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
asyncdefmain():""" This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
await rvr.infrared_control.start_infrared_broadcasting(
far_code=InfraredCodes.one,
near_code=InfraredCodes.zero
)
await rvr.raw_motors(
left_mode=RawMotorModesEnum.forward.value,
left_speed=64, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.forward.value,
right_speed=64# Valid speed values are 0-255
)
# Delay to allow RVR to driveawait asyncio.sleep(4)
await rvr.infrared_control.stop_infrared_broadcasting()
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
asyncio.gather(
rvr.stop_robot_to_robot_infrared_broadcasting(),
rvr.close()
)
)
finally:
if loop.is_running():
loop.close()
Infrared - Listen For and Send IR
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
asyncdefinfrared_message_received_handler(infrared_message):
print('Infrared message response: ', infrared_message)
asyncdefmain():""" This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
await rvr.on_robot_to_robot_infrared_message_received_notify(handler=infrared_message_received_handler)
await rvr.enable_robot_infrared_message_notify(is_enabled=True)
infrared_code = 3
strength = 64whileTrue:
await rvr.send_infrared_message(
infrared_code=infrared_code,
front_strength=strength,
left_strength=strength,
right_strength=strength,
rear_strength=strength
)
print('Infrared message sent with code: {0}'.format(infrared_code))
await asyncio.sleep(2)
if __name__ == '__main__':
try:
asyncio.ensure_future(
main()
)
loop.run_forever()
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
asyncio.gather(
rvr.stop_robot_to_robot_infrared_broadcasting(),
rvr.close()
)
)
finally:
if loop.is_running():
loop.close()
Infrared - Listen For and Send IR with Helper
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import InfraredCodes
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
asyncdefinfrared_message_received_handler(infrared_message):
print('Infrared message response: ', infrared_message)
asyncdefmain():""" This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
await rvr.infrared_control.listen_for_infrared_message(handler=infrared_message_received_handler)
codes = [
InfraredCodes.zero,
InfraredCodes.one,
InfraredCodes.two,
InfraredCodes.three
]
whileTrue:
await rvr.infrared_control.send_infrared_messages(
messages=codes,
strength=64
)
print('Infrared message sent with codes: {0}'.format([code.value for code in codes]))
await asyncio.sleep(2)
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
asyncio.gather(
rvr.stop_robot_to_robot_infrared_broadcasting(),
rvr.close()
)
)
finally:
if loop.is_running():
loop.close()
Infrared - Listen For and Send IR with REST API
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import RestfulAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=RestfulAsyncDal(
domain='10.211.2.21', # Add your raspberry-pi's IP address here
port=2010# The port opened by the npm server is always 2010
)
)
asyncdefinfrared_message_received_handler(response):
print('Response data for IR message received:',response)
asyncdefmain():""" This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
To try this out, write a script for your other robot that a) broadcasts on the corresponding
channel that RVR is set to listen to [in this case channel 0] and b) listens on the channel which
RVR sends messages on [in this case channel 3]
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
await rvr.on_robot_to_robot_infrared_message_received_notify(handler=infrared_message_received_handler)
await rvr.enable_robot_infrared_message_notify(is_enabled=True)
infrared_code = 3
strength = 64whileTrue:
await rvr.send_infrared_message(
infrared_code=infrared_code,
front_strength=strength,
left_strength=strength,
right_strength=strength,
rear_strength=strength
)
print('Infrared message sent with code: {0}'.format(infrared_code))
await asyncio.sleep(2)
if __name__ == '__main__':
try:
asyncio.ensure_future(
main()
)
loop.run_forever()
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
asyncio.gather(
rvr.stop_robot_to_robot_infrared_broadcasting(),
rvr.close()
)
)
finally:
if loop.is_running():
loop.close()
LEDs - Set All LEDs
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups
from sphero_sdk import SerialAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
asyncdefmain():""" This program demonstrates how to set the all the LEDs.
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
await rvr.set_all_leds(
led_group=RvrLedGroups.all_lights.value,
led_brightness_values=[color for _ in range(10) for color in Colors.off.value]
)
# Delay to show LEDs changeawait asyncio.sleep(1)
await rvr.set_all_leds(
led_group=RvrLedGroups.all_lights.value,
led_brightness_values=[color for x in range(10) for color in [0, 255, 0]]
)
# Delay to show LEDs changeawait asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
LEDs - Set All LEDs with Helper
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import Colors
from sphero_sdk import SerialAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
asyncdefmain():""" This program demonstrates how to set the all the LEDs of RVR using the LED control helper.
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
await rvr.led_control.turn_leds_off()
# Delay to show LEDs changeawait asyncio.sleep(1)
await rvr.led_control.set_all_leds_color(color=Colors.yellow)
# Delay to show LEDs changeawait asyncio.sleep(1)
await rvr.led_control.turn_leds_off()
# Delay to show LEDs changeawait asyncio.sleep(1)
await rvr.led_control.set_all_leds_rgb(red=255, green=144, blue=0)
# Delay to show LEDs changeawait asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
LEDs - Set Multiple LEDs
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups
from sphero_sdk import SerialAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
asyncdefmain():""" This program demonstrates how to set multiple LEDs.
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
await rvr.set_all_leds(
led_group=RvrLedGroups.all_lights.value,
led_brightness_values=[color for _ in range(10) for color in Colors.off.value]
)
# Delay to show LEDs changeawait asyncio.sleep(1)
await rvr.set_all_leds(
led_group=RvrLedGroups.headlight_left.value | RvrLedGroups.headlight_right.value,
led_brightness_values=[
0, 0, 255,
255, 0, 0
]
)
# Delay to show LEDs changeawait asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
LEDs - Set Multiple LEDs with Helper
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups
from sphero_sdk import SerialAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
asyncdefmain():""" This program demonstrates how to set multiple LEDs on RVR using the LED control helper.
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
await rvr.led_control.turn_leds_off()
# Delay to show LEDs changeawait asyncio.sleep(1)
await rvr.led_control.set_multiple_leds_with_enums(
leds=[
RvrLedGroups.headlight_left,
RvrLedGroups.headlight_right
],
colors=[
Colors.green,
Colors.blue
]
)
# Delay to show LEDs changeawait asyncio.sleep(1)
await rvr.led_control.set_multiple_leds_with_rgb(
leds=[
RvrLedGroups.headlight_left,
RvrLedGroups.headlight_right
],
colors=[
255, 0, 0,
0, 255, 0
]
)
# Delay to show LEDs changeawait asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
LEDs - Set Multiple LEDs with REST API
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups
from sphero_sdk import RestfulAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=RestfulAsyncDal(
domain='10.211.2.21', # Add your raspberry-pi's IP address here
port=2010# The port opened by the npm server is always 2010
)
)
asyncdefmain():""" This program demonstrates how to set multiple LEDs.
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
await rvr.set_all_leds(
led_group=RvrLedGroups.all_lights.value,
led_brightness_values=[color for _ in range(10) for color in Colors.off.value]
)
# Delay to show LEDs changeawait asyncio.sleep(1)
await rvr.set_all_leds(
led_group=RvrLedGroups.headlight_left.value | RvrLedGroups.headlight_right.value,
led_brightness_values=[
0, 0, 255,
255, 0, 0
]
)
# Delay to show LEDs changeawait asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
LEDs - Set Single LED
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups
from sphero_sdk import SerialAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
asyncdefmain():""" This program demonstrates how to set a single LED.
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
await rvr.set_all_leds(
led_group=RvrLedGroups.all_lights.value,
led_brightness_values=[color for _ in range(10) for color in Colors.off.value]
)
# Delay to show LEDs changeawait asyncio.sleep(1)
await rvr.set_all_leds(
led_group=RvrLedGroups.headlight_right.value, # 0xe00
led_brightness_values=[255, 0, 0]
)
# Delay to show LEDs changeawait asyncio.sleep(1)
await rvr.set_all_leds(
led_group=RvrLedGroups.headlight_left.value, # 0x1c0
led_brightness_values=[0, 255, 0]
)
# Delay to show LEDs changeawait asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
LEDs - Set Single LED with Helper
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups
from sphero_sdk import SerialAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
asyncdefmain():""" This program demonstrates how to set a single LED on RVR using the LED control helper.
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
await rvr.led_control.turn_leds_off()
# Delay to show LEDs changeawait asyncio.sleep(1)
await rvr.led_control.set_led_rgb(
led=RvrLedGroups.headlight_right,
red=255,
green=0,
blue=0
)
# Delay to show LEDs changeawait asyncio.sleep(1)
await rvr.led_control.set_led_color(
led=RvrLedGroups.headlight_left,
color=Colors.green
)
# Delay to show LEDs changeawait asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Power - Get Battery State
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import BatteryVoltageStatesEnum as VoltageStates
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
asyncdefmain():""" This program demonstrates how to retrieve the battery state of RVR.
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
battery_percentage = await rvr.get_battery_percentage()
print('Battery percentage: ', battery_percentage)
battery_voltage_state = await rvr.get_battery_voltage_state()
print('Voltage state: ', battery_voltage_state)
state_info = '[{}, {}, {}, {}]'.format(
'{}: {}'.format(VoltageStates.unknown.name, VoltageStates.unknown.value),
'{}: {}'.format(VoltageStates.ok.name, VoltageStates.ok.value),
'{}: {}'.format(VoltageStates.low.name, VoltageStates.low.value),
'{}: {}'.format(VoltageStates.critical.name, VoltageStates.critical.value)
)
print('Voltage states: ', state_info)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Power - Set Battery State Notifications
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
asyncdefbattery_voltage_state_change_handler(battery_voltage_state):
print('Battery voltage state: ', battery_voltage_state)
asyncdefmain():""" This program demonstrates how to enable battery state change notifications.
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
await rvr.on_battery_voltage_state_change_notify(handler=battery_voltage_state_change_handler)
await rvr.enable_battery_voltage_state_change_notify(is_enabled=True)
print('Waiting for battery notification...')
# The asyncio loop will run forever to give the aforementioned events time to occurif __name__ == '__main__':
try:
asyncio.ensure_future(
main()
)
loop.run_forever()
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Power - Get Battery State with REST API
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import RestfulAsyncDal
from sphero_sdk import BatteryVoltageStatesEnum as VoltageStates
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=RestfulAsyncDal(
domain='10.211.2.21', # Add your raspberry-pi's IP address here
port=2010# The port opened by the npm server is always 2010
)
)
asyncdefmain():""" This program demonstrates how to retrieve the battery state of RVR and print it to the console.
Echo can be used to check to see if RVR is connected and awake. In order to test it, a node.js
server must be running on the raspberry-pi connected to RVR. This code is meant to be executed
from a separate computer.
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
battery_percentage = await rvr.get_battery_percentage()
print('Battery percentage: ', battery_percentage)
battery_voltage_state = await rvr.get_battery_voltage_state()
print('Voltage state: ', battery_voltage_state)
state_info = '[{}, {}, {}, {}]'.format(
'{}: {}'.format(VoltageStates.unknown.name, VoltageStates.unknown.value),
'{}: {}'.format(VoltageStates.ok.name, VoltageStates.ok.value),
'{}: {}'.format(VoltageStates.low.name, VoltageStates.low.value),
'{}: {}'.format(VoltageStates.critical.name, VoltageStates.critical.value)
)
print('Voltage states: ', state_info)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Power - Sleep Monitoring
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
asyncdefwill_sleep_handler():
print('RVR is about to sleep...')
# here we could issue a command to RVR, e.g. wake() such that the sleep timer is resetasyncdefdid_sleep_handler():
print('RVR is asleep...')
asyncdefmain():""" This program demonstrates how to register handlers for a) the event received 10 seconds
before RVR will sleep unless some new command is issued and b) the event received
when RVR does go to sleep.
Note that these notifications are received without the need to enable them on the robot.
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
await rvr.on_will_sleep_notify(handler=will_sleep_handler)
await rvr.on_did_sleep_notify(handler=did_sleep_handler)
# The asyncio loop will run forever to give the aforementioned events time to occurif __name__ == '__main__':
try:
asyncio.ensure_future(
main()
)
loop.run_forever()
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Sensor Streaming - Change Stream Settings
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import RvrStreamingServices
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
asyncdefimu_handler(imu_data):
print('IMU data response: ', imu_data)
asyncdefambient_light_handler(ambient_light_data):
print('Ambient data response:', ambient_light_data)
asyncdefvelocity_handler(velocity_data):
print('Velocity data response:', velocity_data)
asyncdefmain():""" This program demonstrates how to update sensor streaming parameters at runawait asyncio.
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
print('----------', 'Enabling IMU at 100 ms', '----------')
await rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.imu,
handler=imu_handler
)
await rvr.sensor_control.start(interval=100)
# Delay to allow RVR to stream sensor dataawait asyncio.sleep(5)
print('----------', 'Updating interval to 1000 ms', '----------')
await rvr.sensor_control.stop()
await rvr.sensor_control.start(interval=1000)
# Delay to allow RVR to stream sensor dataawait asyncio.sleep(5)
print('----------', 'Adding ambient light and velocity sensor streams', '----------')
await rvr.sensor_control.stop()
await rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.ambient_light,
handler=ambient_light_handler
)
await rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.velocity,
handler=velocity_handler
)
await rvr.sensor_control.start(interval=1000)
# Delay to allow RVR to stream sensor dataawait asyncio.sleep(5)
print('----------', 'Disabling IMU sensor stream and updating interval to 100 ms', '----------')
await rvr.sensor_control.stop()
await rvr.sensor_control.remove_sensor_data_handler(service=RvrStreamingServices.imu)
await rvr.sensor_control.start(interval=100)
# Delay to allow RVR to stream sensor dataawait asyncio.sleep(5)
print('----------', 'Clearing all services', '----------')
await rvr.sensor_control.clear()
# Delay to allow RVR to stream sensor dataawait asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
asyncio.gather(
rvr.sensor_control.clear(),
rvr.close()
)
)
finally:
if loop.is_running():
loop.close()
Sensor Streaming - Change Stream Settings with REST API
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import RestfulAsyncDal
from sphero_sdk import RvrStreamingServices
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=RestfulAsyncDal(
domain='10.211.2.21', # Add your raspberry-pi's IP address here
port=2010
)
)
asyncdefimu_handler(imu_data):
print('IMU data response: ', imu_data)
asyncdefambient_light_handler(ambient_light_data):
print('Ambient data response:', ambient_light_data)
asyncdefvelocity_handler(velocity_data):
print('Velocity data response:', velocity_data)
asyncdefmain():""" This program has RVR drive around in different directions using the function raw_motors.
Note:
To give RVR time to drive, we call asyncio.sleep(...); if we did not have these calls, the program would
go on and execute all the statements and exit without the driving ever taking place.
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
print('----------', 'Enabling IMU at 100 ms', '----------')
await rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.imu,
handler=imu_handler
)
await rvr.sensor_control.start(interval=100)
# Delay to allow RVR to stream sensor dataawait asyncio.sleep(5)
print('----------', 'Updating interval to 1000 ms', '----------')
await rvr.sensor_control.stop()
await rvr.sensor_control.start(interval=1000)
# Delay to allow RVR to stream sensor dataawait asyncio.sleep(5)
print('----------', 'Adding ambient light and velocity sensor streams', '----------')
await rvr.sensor_control.stop()
await rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.ambient_light,
handler=ambient_light_handler
)
await rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.velocity,
handler=velocity_handler
)
await rvr.sensor_control.start(interval=1000)
# Delay to allow RVR to stream sensor dataawait asyncio.sleep(5)
print('----------', 'Disabling IMU sensor stream and updating interval to 100 ms', '----------')
await rvr.sensor_control.stop()
await rvr.sensor_control.remove_sensor_data_handler(service=RvrStreamingServices.imu)
await rvr.sensor_control.start(interval=100)
# Delay to allow RVR to stream sensor dataawait asyncio.sleep(5)
print('----------', 'Clearing all services', '----------')
await rvr.sensor_control.clear()
# Delay to allow RVR to stream sensor dataawait asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
asyncio.gather(
rvr.sensor_control.clear(),
rvr.close()
)
)
finally:
if loop.is_running():
loop.close()
Sensor Streaming - Multi Sensor Stream
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import RvrStreamingServices
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
asyncdefimu_handler(imu_data):
print('IMU data response: ', imu_data)
asyncdefcolor_detected_handler(color_detected_data):
print('Color detection data response: ', color_detected_data)
asyncdefaccelerometer_handler(accelerometer_data):
print('Accelerometer data response: ', accelerometer_data)
asyncdefambient_light_handler(ambient_light_data):
print('Ambient light data response: ', ambient_light_data)
asyncdefmain():""" This program demonstrates how to enable multiple sensors to stream.
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
await rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.imu,
handler=imu_handler
)
await rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.color_detection,
handler=color_detected_handler
)
await rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.accelerometer,
handler=accelerometer_handler
)
await rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.ambient_light,
handler=ambient_light_handler
)
await rvr.sensor_control.start(interval=250)
# The asyncio loop will run forever to allow infinite streaming.if __name__ == '__main__':
try:
asyncio.ensure_future(
main()
)
loop.run_forever()
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
asyncio.gather(
rvr.sensor_control.clear(),
rvr.close()
)
)
finally:
if loop.is_running():
loop.close()
Sensor Streaming - Single Sensor Stream
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import RvrStreamingServices
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
asyncdefaccelerometer_handler(accelerometer_data):
print('Accelerometer data response: ', accelerometer_data)
asyncdefmain():""" This program demonstrates how to enable a single sensor to stream.
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
await rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.accelerometer,
handler=accelerometer_handler
)
await rvr.sensor_control.start(interval=250)
# The asyncio loop will run forever to allow infinite streaming.if __name__ == '__main__':
try:
asyncio.ensure_future(
main()
)
loop.run_forever()
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
asyncio.gather(
rvr.sensor_control.clear(),
rvr.close()
)
)
finally:
if loop.is_running():
loop.close()
System - Get Main App Version
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import SpheroRvrTargets
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
asyncdefmain():""" This program demonstrates how to obtain the firmware version for a specific processor.
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
nordic_main_application_version = await rvr.get_main_application_version(target=SpheroRvrTargets.primary.value)
print('Nordic main application version (target 1): ', nordic_main_application_version)
st_main_application_version = await rvr.get_main_application_version(target=SpheroRvrTargets.secondary.value)
print('ST main application version (target 2): ', st_main_application_version)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
System - Get Main App Version with REST API
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SpheroRvrTargets
from sphero_sdk import RestfulAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=RestfulAsyncDal(
domain='10.211.2.21', # Add your raspberry-pi's IP address here
port=2010# The port opened by the npm server is always 2010
)
)
asyncdefmain():""" This program demonstrates how to obtain the firmware version for a specific processor.
In order to test it, a node.js server must be running on the raspberry-pi connected to RVR.
This code is meant to be executed from a separate computer.
"""await rvr.wake()
# Give RVR time to wake upawait asyncio.sleep(2)
nordic_main_application_version = await rvr.get_main_application_version(target=SpheroRvrTargets.primary.value)
print('Nordic main application version (target 1): ', nordic_main_application_version)
st_main_application_version = await rvr.get_main_application_version(target=SpheroRvrTargets.secondary.value)
print('ST main application version (target 2): ', st_main_application_version)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Observer
API and Shell (Pinging)
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import SpheroRvrTargets
rvr = SpheroRvrObserver()
defecho_handler(echo_response):
print('Echo response: ', echo_response)
defmain():""" This program demonstrates how to use the echo command, which sends data to RVR and RVR returns
the same data. Echo can be used to check to see if RVR is connected and awake.
"""
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.echo(
data=[0, 1, 2],
handler=echo_handler,
target=SpheroRvrTargets.primary.value
)
# Give RVR time to respond
time.sleep(1)
rvr.echo(
data=[3, 4, 5],
handler=echo_handler,
target=SpheroRvrTargets.secondary.value
)
# Sleep for one second such that RVR has time to send data back
time.sleep(1)
rvr.close()
if __name__ == '__main__':
main()
Color Sensor (Color Detection)
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import RvrStreamingServices
rvr = SpheroRvrObserver()
defcolor_detected_handler(color_detected_data):
print('Color detection data response: ', color_detected_data)
defmain():""" This program demonstrates how to use the color sensor on RVR (located on the down side of RVR, facing the floor)
to report colors detected.
"""try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.enable_color_detection(is_enabled=True)
rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.color_detection,
handler=color_detected_handler
)
rvr.sensor_control.start(interval=250)
# Allow this program to run for 10 seconds
time.sleep(10)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.sensor_control.clear()
# Delay to allow RVR issue command before closing
time.sleep(.5)
rvr.close()
if __name__ == '__main__':
main()
Drive - Raw Motors
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import RawMotorModesEnum
rvr = SpheroRvrObserver()
defmain():""" This program has RVR drive around in different directions.
"""try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.reset_yaw()
rvr.raw_motors(
left_mode=RawMotorModesEnum.forward.value,
left_speed=128, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.forward.value,
right_speed=128# Valid speed values are 0-255
)
# Delay to allow RVR to drive
time.sleep(1)
rvr.raw_motors(
left_mode=RawMotorModesEnum.reverse.value,
left_speed=64, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.reverse.value,
right_speed=64# Valid speed values are 0-255
)
# Delay to allow RVR to drive
time.sleep(1)
rvr.raw_motors(
left_mode=RawMotorModesEnum.reverse.value,
left_speed=128, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.forward.value,
right_speed=128# Valid speed values are 0-255
)
# Delay to allow RVR to drive
time.sleep(1)
rvr.raw_motors(
left_mode=RawMotorModesEnum.forward.value,
left_speed=128, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.forward.value,
right_speed=128# Valid speed values are 0-255
)
# Delay to allow RVR to drive
time.sleep(1)
rvr.raw_motors(
left_mode=RawMotorModesEnum.off.value,
left_speed=0, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.off.value,
right_speed=0# Valid speed values are 0-255
)
# Delay to allow RVR to drive
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.close()
if __name__ == '__main__':
main()
Drive with Heading
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import DriveFlagsBitmask
rvr = SpheroRvrObserver()
defmain():""" This program has RVR drive around in different directions using the function drive_with_heading.
"""try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.reset_yaw()
rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=0, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to drive
time.sleep(1)
rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=0, # Valid heading values are 0-359
flags=DriveFlagsBitmask.drive_reverse.value
)
# Delay to allow RVR to drive
time.sleep(1)
rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=90, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to drive
time.sleep(1)
rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=270, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to drive
time.sleep(1)
rvr.drive_with_heading(
speed=0, # Valid heading values are 0-359
heading=0, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to drive
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.close()
if __name__ == '__main__':
main()
Drive with Heading - Reverse Mode
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import DriveFlagsBitmask
rvr = SpheroRvrObserver()
defmain():""" This program has RVR drive around in different directions using the function drive_with_heading.
"""try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.reset_yaw()
rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=90, # Valid heading values are 0-359
flags=DriveFlagsBitmask.drive_reverse.value
)
# Delay to allow RVR to drive
time.sleep(1)
rvr.drive_with_heading(
speed=0, # Valid heading values are 0-359
heading=0, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to drive
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.close()
if __name__ == '__main__':
main()
Drive with Helper
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
rvr = SpheroRvrObserver()
defmain():""" This program has RVR drive with how to drive RVR using the drive control helper.
"""try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.drive_control.reset_heading()
rvr.drive_control.drive_forward_seconds(
speed=64,
heading=0, # Valid heading values are 0-359
time_to_drive=1
)
# Delay to allow RVR to drive
time.sleep(1)
rvr.drive_control.drive_backward_seconds(
speed=64,
heading=0, # Valid heading values are 0-359
time_to_drive=1
)
# Delay to allow RVR to drive
time.sleep(1)
rvr.drive_control.turn_left_degrees(
heading=0, # Valid heading values are 0-359
amount=90
)
# Delay to allow RVR to drive
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.close()
if __name__ == '__main__':
main()
Drive with Helper - Roll
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
rvr = SpheroRvrObserver()
defmain():""" This program has RVR drive with how to drive RVR using the drive control helper.
"""try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.drive_control.reset_heading()
rvr.drive_control.roll_start(
speed=64,
heading=90
)
# Delay to allow RVR to drive
time.sleep(1)
rvr.drive_control.roll_stop(heading=270)
# Delay to allow RVR to drive
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.close()
if __name__ == '__main__':
main()
Infrared - Broadcast IR
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import InfraredCodes
from sphero_sdk import RawMotorModesEnum
rvr = SpheroRvrObserver()
defmain():""" This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
"""try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.start_robot_to_robot_infrared_broadcasting(
far_code=InfraredCodes.one.value,
near_code=InfraredCodes.zero.value
)
for i in range(2):
rvr.raw_motors(
left_mode=RawMotorModesEnum.forward.value,
left_speed=64, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.forward.value,
right_speed=64# Valid speed values are 0-255
)
# Delay to allow RVR to drive
time.sleep(2)
except:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.stop_robot_to_robot_infrared_broadcasting()
# Delay to allow RVR issue command before closing
time.sleep(.5)
rvr.close()
if __name__ == '__main__':
main()
Infrared - Broadcast IR with Helper
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import InfraredCodes
from sphero_sdk import RawMotorModesEnum
rvr = SpheroRvrObserver()
defmain():""" This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
"""try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.infrared_control.start_robot_to_robot_infrared_broadcasting(
far_code=InfraredCodes.one.value,
near_code=InfraredCodes.zero.value
)
rvr.raw_motors(
left_mode=RawMotorModesEnum.forward.value,
left_speed=64, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.forward.value,
right_speed=64# Valid speed values are 0-255
)
# Delay to allow RVR to drive
time.sleep(4)
except:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.stop_robot_to_robot_infrared_broadcasting()
# Delay to allow RVR issue command before closing
time.sleep(.5)
rvr.close()
if __name__ == '__main__':
main()
Infrared - Listen For and Send IR
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
rvr = SpheroRvrObserver()
definfrared_message_received_handler(infrared_message):
print('Infrared message response: ', infrared_message)
defmain():""" This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
"""try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.on_robot_to_robot_infrared_message_received_notify(handler=infrared_message_received_handler)
rvr.enable_robot_infrared_message_notify(is_enabled=True)
infrared_code = 3
strength = 64for _ in range(20):
rvr.send_infrared_message(
infrared_code=infrared_code,
front_strength=strength,
left_strength=strength,
right_strength=strength,
rear_strength=strength
)
print('Infrared message sent with code: {0}'.format(infrared_code))
time.sleep(2)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.stop_robot_to_robot_infrared_broadcasting()
# Delay to allow RVR issue command before closing
time.sleep(.5)
rvr.close()
if __name__ == '__main__':
main()
Infrared - Listen For and Send IR with Helper
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import InfraredCodes
rvr = SpheroRvrObserver()
definfrared_message_received_handler(infrared_message):
print('Infrared message response: ', infrared_message)
defmain():""" This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
"""try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.infrared_control.listen_for_infrared_message(handler=infrared_message_received_handler)
codes = [
InfraredCodes.zero,
InfraredCodes.one,
InfraredCodes.two,
InfraredCodes.three
]
for _ in range(20):
rvr.infrared_control.send_infrared_messages(
messages=codes,
strength=64
)
print('Infrared message sent with codes: {0}'.format([code.value for code in codes]))
time.sleep(2)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.stop_robot_to_robot_infrared_broadcasting()
# Delay to allow RVR issue command before closing
time.sleep(.5)
rvr.close()
if __name__ == '__main__':
main()
LEDs - Set All LEDs
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups
rvr = SpheroRvrObserver()
defmain():""" This program demonstrates how to set the all the LEDs.
"""try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.set_all_leds(
led_group=RvrLedGroups.all_lights.value,
led_brightness_values=[color for _ in range(10) for color in Colors.off.value]
)
# Delay to show LEDs change
time.sleep(1)
rvr.set_all_leds(
led_group=RvrLedGroups.all_lights.value,
led_brightness_values=[color for _ in range(10) for color in [255, 0, 0]]
)
# Delay to show LEDs change
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.close()
if __name__ == '__main__':
main()
LEDs - Set All LEDs with Helper
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import Colors
rvr = SpheroRvrObserver()
defmain():""" This program demonstrates how to set the all the LEDs of RVR using the LED control helper.
"""try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.led_control.turn_leds_off()
# Delay to show LEDs change
time.sleep(1)
rvr.led_control.set_all_leds_color(color=Colors.yellow)
# Delay to show LEDs change
time.sleep(1)
rvr.led_control.turn_leds_off()
# Delay to show LEDs change
time.sleep(1)
rvr.led_control.set_all_leds_rgb(red=255, green=144, blue=0)
# Delay to show LEDs change
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.close()
if __name__ == '__main__':
main()
LEDs - Set Multiple LEDs
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups
rvr = SpheroRvrObserver()
defmain():""" This program demonstrates how to set multiple LEDs.
"""try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.set_all_leds(
led_group=RvrLedGroups.all_lights.value,
led_brightness_values=[color for _ in range(10) for color in Colors.off.value]
)
# Delay to show LEDs change
time.sleep(1)
rvr.set_all_leds(
led_group=RvrLedGroups.headlight_left.value | RvrLedGroups.headlight_right.value,
led_brightness_values=[
255, 0, 0,
0, 0, 255
]
)
# Delay to show LEDs change
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.close()
if __name__ == '__main__':
main()
LEDs - Set Multiple LEDs with Helper
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups
rvr = SpheroRvrObserver()
defmain():""" This program demonstrates how to set multiple LEDs on RVR using the LED control helper.
"""try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.led_control.turn_leds_off()
# Delay to show LEDs change
time.sleep(1)
rvr.led_control.set_multiple_leds_with_enums(
leds=[
RvrLedGroups.headlight_left,
RvrLedGroups.headlight_right
],
colors=[
Colors.green,
Colors.blue
]
)
# Delay to show LEDs change
time.sleep(1)
rvr.led_control.set_multiple_leds_with_rgb(
leds=[
RvrLedGroups.headlight_left,
RvrLedGroups.headlight_right
],
colors=[
255, 0, 0,
0, 255, 0
]
)
# Delay to show LEDs change
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.close()
if __name__ == '__main__':
main()
LEDs - Set Single LED
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups
rvr = SpheroRvrObserver()
defmain():""" This program demonstrates how to set a single LED.
"""try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.set_all_leds(
led_group=RvrLedGroups.all_lights.value,
led_brightness_values=[color for _ in range(10) for color in Colors.off.value]
)
# Delay to show LEDs change
time.sleep(1)
rvr.set_all_leds(
led_group=RvrLedGroups.headlight_right.value, # 0xe00
led_brightness_values=[255, 0, 0]
)
# Delay to show LEDs change
time.sleep(1)
rvr.set_all_leds(
led_group=RvrLedGroups.headlight_left.value, # 0x1c0
led_brightness_values=[0, 255, 0]
)
# Delay to show LEDs change
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.close()
if __name__ == '__main__':
main()
LEDs - Set Single LED with Helper
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups
rvr = SpheroRvrObserver()
defmain():""" This program demonstrates how to set a single LED on RVR using the LED control helper.
"""try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.led_control.turn_leds_off()
# Delay to show LEDs change
time.sleep(1)
rvr.led_control.set_led_rgb(
led=RvrLedGroups.headlight_right,
red=255,
green=0,
blue=0
)
# Delay to show LEDs change
time.sleep(1)
rvr.led_control.set_led_color(
led=RvrLedGroups.headlight_left,
color=Colors.green
)
# Delay to show LEDs change
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.close()
if __name__ == '__main__':
main()
Power - Get Battery State
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import BatteryVoltageStatesEnum as VoltageStates
rvr = SpheroRvrObserver()
defbattery_percentage_handler(battery_percentage):
print('Battery percentage: ', battery_percentage)
defbattery_voltage_handler(battery_voltage_state):
print('Voltage state: ', battery_voltage_state)
state_info = '[{}, {}, {}, {}]'.format(
'{}: {}'.format(VoltageStates.unknown.name, VoltageStates.unknown.value),
'{}: {}'.format(VoltageStates.ok.name, VoltageStates.ok.value),
'{}: {}'.format(VoltageStates.low.name, VoltageStates.low.value),
'{}: {}'.format(VoltageStates.critical.name, VoltageStates.critical.value)
)
print('Voltage states: ', state_info)
defmain():""" This program demonstrates how to retrieve the battery state of RVR.
"""try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.get_battery_percentage(handler=battery_percentage_handler)
# Sleep for one second such that RVR has time to send data back
time.sleep(1)
rvr.get_battery_voltage_state(handler=battery_voltage_handler)
# Sleep for one second such that RVR has time to send data back
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.close()
if __name__ == '__main__':
main()
Power - Get Battery State Notifications
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
rvr = SpheroRvrObserver()
defbattery_voltage_state_change_handler(battery_voltage_state):
print('Battery voltage state: ', battery_voltage_state)
defmain():""" This program demonstrates how to enable battery state change notifications.
"""try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.on_battery_voltage_state_change_notify(handler=battery_voltage_state_change_handler)
rvr.enable_battery_voltage_state_change_notify(is_enabled=True)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.close()
if __name__ == '__main__':
main()
Power - Sleep Monitoring
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
rvr = SpheroRvrObserver()
defwill_sleep_handler():
print('RVR is about to sleep...')
# here we could issue a command to RVR, e.g. wake() such that the sleep timer is resetdefdid_sleep_handler():
print('RVR is asleep...')
defmain():""" This program demonstrates how to register handlers for a) the event received 10 seconds
before RVR will sleep unless some new command is issued and b) the event received
when RVR does go to sleep.
Note that these notifications are received without the need to enable them on the robot.
"""try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.on_will_sleep_notify(will_sleep_handler)
rvr.on_did_sleep_notify(did_sleep_handler)
# Sleep for 310 seconds such that we see the aforementioned events have time to occur
time.sleep(310)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.close()
if __name__ == '__main__':
main()
Sensor Streaming - Change Stream Settings
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import RvrStreamingServices
rvr = SpheroRvrObserver()
defimu_handler(imu_data):
print('IMU data response: ', imu_data)
defambient_light_handler(ambient_light_data):
print('Ambient data response:', ambient_light_data)
defvelocity_handler(velocity_data):
print('Velocity data response:', velocity_data)
defmain():""" This program demonstrates how to update sensor streaming parameters at runtime.
"""try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
print('----------', 'Enabling IMU at 100 ms', '----------')
rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.imu,
handler=imu_handler
)
rvr.sensor_control.start(interval=100)
# Delay to allow RVR to stream sensor data
time.sleep(5)
print('----------', 'Updating interval to 1000 ms', '----------')
rvr.sensor_control.stop()
rvr.sensor_control.start(interval=1000)
# Delay to allow RVR to stream sensor data
time.sleep(5)
print('----------', 'Adding ambient light and velocity sensor streams', '----------')
rvr.sensor_control.stop()
rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.ambient_light,
handler=ambient_light_handler
)
rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.velocity,
handler=velocity_handler
)
rvr.sensor_control.start(interval=1000)
# Delay to allow RVR to stream sensor data
time.sleep(5)
print('----------', 'Disabling IMU sensor stream and updating interval to 100 ms', '----------')
rvr.sensor_control.stop()
rvr.sensor_control.remove_sensor_data_handler(service=RvrStreamingServices.imu)
rvr.sensor_control.start(interval=100)
# Delay to allow RVR to stream sensor data
time.sleep(5)
print('----------', 'Clearing all services', '----------')
rvr.sensor_control.clear()
# Delay to allow RVR to stream sensor data
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
if len(rvr.sensor_control.enabled_sensors) > 0:
rvr.sensor_control.clear()
# Delay to allow RVR issue command before closing
time.sleep(.5)
rvr.close()
if __name__ == '__main__':
main()
Sensor Streaming - Multi Sensor Stream
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import RvrStreamingServices
rvr = SpheroRvrObserver()
defimu_handler(imu_data):
print('IMU data response: ', imu_data)
defcolor_detected_handler(color_detected_data):
print('Color detection data response: ', color_detected_data)
defaccelerometer_handler(accelerometer_data):
print('Accelerometer data response: ', accelerometer_data)
defambient_light_handler(ambient_light_data):
print('Ambient light data response: ', ambient_light_data)
defmain():""" This program demonstrates how to enable multiple sensors to stream.
"""try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.imu,
handler=imu_handler
)
rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.color_detection,
handler=color_detected_handler
)
rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.accelerometer,
handler=accelerometer_handler
)
rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.ambient_light,
handler=ambient_light_handler
)
rvr.sensor_control.start(interval=250)
whileTrue:
# Delay to allow RVR to stream sensor data
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.sensor_control.clear()
# Delay to allow RVR issue command before closing
time.sleep(.5)
rvr.close()
if __name__ == '__main__':
main()
Sensor Streaming - Single Sensor Stream
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import RvrStreamingServices
rvr = SpheroRvrObserver()
defaccelerometer_handler(accelerometer_data):
print('Accelerometer data response: ', accelerometer_data)
defmain():""" This program demonstrates how to enable a single sensor to stream.
"""try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.accelerometer,
handler=accelerometer_handler
)
rvr.sensor_control.start(interval=250)
whileTrue:
# Delay to allow RVR to stream sensor data
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.sensor_control.clear()
# Delay to allow RVR issue command before closing
time.sleep(.5)
rvr.close()
if __name__ == '__main__':
main()
System (Get Main App Version)
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from sphero_sdk import SpheroRvrObserver
from sphero_sdk import SpheroRvrTargets
rvr = SpheroRvrObserver()
defget_nordic_main_application_version_handler(nordic_main_application_version):
print('Nordic main application version (target 1): ', nordic_main_application_version)
defget_st_main_application_version_handler(st_main_application_version):
print('ST main application version (target 2): ', st_main_application_version)
defmain():""" This program demonstrates how to obtain the firmware version for a specific processor.
"""try:
rvr.wake()
# Give RVR time to wake up
time.sleep(2)
rvr.get_main_application_version(
handler=get_nordic_main_application_version_handler,
target=SpheroRvrTargets.primary.value
)
# Sleep for one second such that RVR has time to send data back
time.sleep(1)
rvr.get_main_application_version(
handler=get_st_main_application_version_handler,
target=SpheroRvrTargets.secondary.value
)
# Sleep for one second such that RVR has time to send data back
time.sleep(1)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
finally:
rvr.close()
if __name__ == '__main__':
main()