Examples¶
Code examples can be found in the examples directory on GitHub
Also take a look at the unit tests in the tests directory.
Get all calling users¶
Source: calling_users.py
1#!/usr/bin/env python
2"""
3Example script
4Get all calling users within the org
5"""
6
7from dotenv import load_dotenv
8
9from wxc_sdk import WebexSimpleApi
10
11load_dotenv()
12
13api = WebexSimpleApi()
14
15# using wxc_sdk.people.PeopleApi.list to iterate over persons
16# Parameter calling_data needs to be set to true to gat calling specific information
17# calling users have the attribute location_id set
18calling_users = [user for user in api.people.list(calling_data=True)
19 if user.location_id]
20print(f'{len(calling_users)} users:')
21print('\n'.join(user.display_name for user in calling_users))
Get all calling users (async variant)¶
Source: calling_users_async.py
1#!/usr/bin/env python
2"""
3Example script
4Get all calling users within the org using the (experimental) async API
5"""
6import asyncio
7import time
8
9from dotenv import load_dotenv
10
11from wxc_sdk.as_api import AsWebexSimpleApi
12
13load_dotenv()
14
15
16async def get_calling_users():
17 """
18 Get details of all calling enabled users by:
19 1) getting all calling users
20 2) collecting all users that have a calling license
21 3) getting details for all users
22 """
23 async with AsWebexSimpleApi(concurrent_requests=40) as api:
24 print('Collecting calling licenses')
25 calling_license_ids = set(lic.license_id for lic in await api.licenses.list()
26 if lic.webex_calling)
27
28 # get users with a calling license
29 calling_users = [user async for user in api.people.list_gen()
30 if any(lic_id in calling_license_ids for lic_id in user.licenses)]
31 print(f'{len(calling_users)} users:')
32 print('\n'.join(user.display_name for user in calling_users))
33
34 # get details for all users
35 start = time.perf_counter()
36 details = await asyncio.gather(*[api.people.details(person_id=user.person_id, calling_data=True)
37 for user in calling_users])
38 expired = time.perf_counter() - start
39 print(f'Got details for {len(details)} users in {expired * 1000:.3f} ms')
40
41
42asyncio.run(get_calling_users())
Default call forwarding settings for all users¶
This example start with the list of all calling users and then calls
wxc_sdk.person_settings.forwarding.PersonForwardingApi.configure()
for each user. To speed up things a
ThreadPoolExecutor
is used and all update operations are scheduled for execution by the thread pool.
Source: reset_call_forwarding.py
1#!/usr/bin/env python
2"""
3Example script
4Reset call forwarding to default for all users in the org
5"""
6import asyncio
7import logging
8import time
9
10from dotenv import load_dotenv
11
12from wxc_sdk.all_types import PersonForwardingSetting
13from wxc_sdk.as_api import AsWebexSimpleApi
14
15
16async def main():
17 # load environment. SDk fetches the access token from WEBEX_ACCESS_TOKEN environment variable
18 load_dotenv()
19
20 logging.basicConfig(level=logging.INFO)
21
22 # set to DEBUG to see the actual requests
23 logging.getLogger('wxc_sdk.rest').setLevel(logging.INFO)
24
25 async with AsWebexSimpleApi() as api:
26
27 # get all calling users
28 start = time.perf_counter_ns()
29 calling_users = [user for user in await api.people.list(calling_data=True)
30 if user.location_id]
31 print(f'Got {len(calling_users)} calling users in '
32 f'{(time.perf_counter_ns() - start) / 1e6:.3f} ms')
33
34 # set call forwarding to default for all users
35 # default call forwarding settings
36 forwarding = PersonForwardingSetting.default()
37
38 # schedule update for each user and wait for completion
39 start = time.perf_counter_ns()
40 await asyncio.gather(*[api.person_settings.forwarding.configure(person_id=user.person_id,
41 forwarding=forwarding)
42 for user in calling_users])
43 print(f'Reset call forwarding to default for {len(calling_users)} users in '
44 f'{(time.perf_counter_ns() - start) / 1e6:.3f} ms')
45
46if __name__ == '__main__':
47 asyncio.run(main())
Modify number of rings configuration for users read from CSV¶
Here we read a bunch of user email addresses from a CSV. The CSV as an ERROR column and we only want to consider users without errors.
For all these users a VM setting is updated and the results are written to a CSV for further processing.
Source: modify_voicemail.py
1#!/usr/bin/env python
2"""
3Example Script
4Modifying number of rings configuration in voicemail settings
5Run -> python3 modify_voicemail.py modify_voicemail.csv
6"""
7import csv
8import sys
9import traceback
10from concurrent.futures import ThreadPoolExecutor
11
12from dotenv import load_dotenv
13
14from wxc_sdk import WebexSimpleApi
15from wxc_sdk.all_types import *
16
17VOICEMAIL_SETTINGS_NUMBER_OF_RINGS = 6
18
19# loading environment variables - use .env file for development
20load_dotenv()
21
22
23def update_vm_settings():
24 """
25 actually update VM settings for all users present in input CSV
26 """
27 api = WebexSimpleApi()
28 final_report = []
29 mail_ids = []
30 # using wxc_sdk.people.PeopleApi.list to iterate over persons
31 # Parameter calling_data needs to be set to true to gat calling specific information
32 # calling users have the attribute location_id set
33 calling_users = [user for user in api.people.list(calling_data=True)
34 if user.location_id]
35 print(f'{len(calling_users)} users:')
36 print('\n'.join(user.display_name for user in calling_users))
37
38 # get CSV file name from command line
39 with open(str(sys.argv[1]), 'r') as csv_file:
40 reader = csv.DictReader(csv_file)
41 # read all records from CSV. Ony consider records w/o error
42 for col in reader:
43 if not col['ERROR']:
44 # collect email address from USERNAME column for further processing
45 mail_ids.append(col['USERNAME'])
46 else:
47 final_report.append((col['USERNAME'], 'FAILED DUE TO ERROR REASON IN INPUT FILE'))
48
49 # work on calling users that have an email address that we read from the CSV
50 filteredUsers = [d for d in calling_users if d.emails[0] in mail_ids]
51
52 print("\nCalling Users in CI - Count ", len(calling_users))
53 print("Mail IDs from input file after removing error columns - Count ", len(mail_ids))
54 print("FilteredUsers Users - Count", len(filteredUsers))
55
56 def set_number_of_rings(user: Person):
57 """
58 Read VM config for a user
59 :param user: user to update
60 """
61 try:
62 # shortcut
63 vm = api.person_settings.voicemail
64
65 # Read Current configuration
66 vm_settings = vm.read(person_id=user.person_id)
67 print(f'\n Existing Configuration: {vm_settings} ')
68
69 # Modify number of rings value
70 vm_settings.send_unanswered_calls.number_of_rings = VOICEMAIL_SETTINGS_NUMBER_OF_RINGS
71 vm.configure(person_id=user.person_id, settings=vm_settings)
72 # Read configuration after changes
73 vm_settings = vm.read(person_id=user.person_id)
74 print(f'\n New Configuration: {vm_settings} ')
75 final_report.append((user.display_name, 'SUCCESS'))
76 except Exception as e:
77 final_report.append((user.display_name, 'FAILURE'))
78 print("type error: " + str(e))
79 print(traceback.format_exc())
80 return
81
82 # Modify settings for the filtered users
83 with ThreadPoolExecutor() as pool:
84 list(pool.map(lambda user: set_number_of_rings(user),
85 filteredUsers))
86
87 print(final_report)
88 with open('output.csv', 'w') as f:
89 write = csv.writer(f)
90 write.writerow(["USERNAME", "STATUS"])
91 write.writerows(final_report)
92
93
94if __name__ == '__main__':
95 update_vm_settings()
Holiday schedule w/ US national holidays for all US locations¶
This example uses the Calendarific API at https://calendarific.com/ to get a list of national US holidays and creates
a “National Holidays
” holiday schedule for all US locations with all these national holidays.
A rudimentary API implementation in calendarific.py
is used for the requests to https://calendarific.com/.
Calendarific APIs require all requests to be authenticated using an API key. You can sign up for a free account to get
a free API account key which then is read from environment variable CALENDARIFIC_KEY
.
Source: us_holidays.py
1#!/usr/bin/env python
2"""
3Example script
4Create a holiday schedule for all US locations with all national holidays
5"""
6
7import logging
8from collections import defaultdict
9from concurrent.futures import ThreadPoolExecutor
10from datetime import date
11from threading import Lock
12from typing import List
13
14from dotenv import load_dotenv
15
16from calendarific import CalendarifiyApi, Holiday
17from wxc_sdk import WebexSimpleApi
18from wxc_sdk.locations import Location
19from wxc_sdk.all_types import ScheduleType, Event, Schedule
20
21log = logging.getLogger(__name__)
22
23# a lock per location to protect observe_in_location()
24location_locks: dict[str, Lock] = defaultdict(Lock)
25
26# Use parallel threads for provisioning?
27USE_THREADING = True
28
29# True: delete holiday schedule instead of creating one
30CLEAN_UP = False
31
32# first and last year for which to create public holiday events
33FIRST_YEAR = 2022
34LAST_YEAR = 2024
35
36LAST_YEAR = not CLEAN_UP and LAST_YEAR or FIRST_YEAR
37
38
39def observe_in_location(*, api: WebexSimpleApi, location: Location, holidays: List[Holiday]):
40 """
41 create/update a "National Holiday" schedule in one location
42
43 :param api: Webex api
44 :type api: WebexSimpleApi
45 :param location: location to work on
46 :type location: Location
47 :param holidays: list of holidays to observe
48 :type holidays: List[Holiday]
49 """
50 # there should always only one thread messing with the holiday schedule of a location
51 with location_locks[location.location_id]:
52 year = holidays[0].date.year
53 schedule_name = 'National Holidays'
54
55 # shortcut
56 ats = api.telephony.schedules
57
58 # existing "National Holiday" schedule or None
59 schedule = next((schedule
60 for schedule in ats.list(obj_id=location.location_id,
61 schedule_type=ScheduleType.holidays,
62 name=schedule_name)
63 if schedule.name == schedule_name),
64 None)
65 if CLEAN_UP:
66 if schedule:
67 log.info(f'Delete schedule {schedule.name} in location {schedule.location_name}')
68 ats.delete_schedule(obj_id=location.location_id,
69 schedule_type=ScheduleType.holidays,
70 schedule_id=schedule.schedule_id)
71 return
72 if schedule:
73 # we need the details: list response doesn't have events
74 schedule = ats.details(obj_id=location.location_id,
75 schedule_type=ScheduleType.holidays,
76 schedule_id=schedule.schedule_id)
77 # create list of desired schedule entries
78 # * one per holiday
79 # * only future holidays
80 # * not on a Sunday
81 today = date.today()
82 events = [Event(name=f'{holiday.name} {holiday.date.year}',
83 start_date=holiday.date,
84 end_date=holiday.date,
85 all_day_enabled=True)
86 for holiday in holidays
87 if holiday.date >= today and holiday.date.weekday() != 6]
88
89 if not schedule:
90 # create new schedule
91 log.debug(f'observe_in_location({location.name}, {year}): no existing schedule')
92 if not events:
93 log.info(f'observe_in_location({location.name}, {year}): no existing schedule, no events, done')
94 return
95 schedule = Schedule(name=schedule_name,
96 schedule_type=ScheduleType.holidays,
97 events=events)
98 log.debug(
99 f'observe_in_location({location.name}, {year}): creating schedule "{schedule_name}" with {len(events)} '
100 f'events')
101 schedule_id = ats.create(obj_id=location.location_id, schedule=schedule)
102 log.info(f'observe_in_location({location.name}, {year}): new schedule id: {schedule_id}, done')
103 return
104
105 # update existing schedule
106 with ThreadPoolExecutor() as pool:
107 # delete existing events in the past
108 to_delete = [event
109 for event in schedule.events
110 if event.start_date < today]
111 if to_delete:
112 log.debug(f'observe_in_location({location.name}, {year}): deleting {len(to_delete)} outdated events')
113 if USE_THREADING:
114 list(pool.map(
115 lambda event: ats.event_delete(obj_id=location.location_id,
116 schedule_type=ScheduleType.holidays,
117 event_id=event.event_id),
118 to_delete))
119 else:
120 for event in to_delete:
121 ats.event_delete(obj_id=location.location_id,
122 schedule_type=ScheduleType.holidays,
123 event_id=event.event_id)
124
125 # add events which don't exist yet
126 existing_dates = set(event.start_date
127 for event in schedule.events)
128 to_add = [event
129 for event in events
130 if event.start_date not in existing_dates]
131 if not to_add:
132 log.info(f'observe_in_location({location.name}, {year}): no events to add, done.')
133 return
134 log.debug(f'observe_in_location({location.name}, {year}): creating {len(to_add)} new events.')
135 if USE_THREADING:
136 list(pool.map(
137 lambda event: ats.event_create(
138 obj_id=location.location_id,
139 schedule_type=ScheduleType.holidays,
140 schedule_id=schedule.schedule_id,
141 event=event),
142 to_add))
143 else:
144 for event in to_add:
145 ats.event_create(
146 obj_id=location.location_id,
147 schedule_type=ScheduleType.holidays,
148 schedule_id=schedule.schedule_id,
149 event=event)
150 log.info(f'observe_in_location({location.name}, {year}): done.')
151 return
152
153
154def observe_national_holidays(*, api: WebexSimpleApi, locations: List[Location],
155 year: int = None):
156 """
157 US national holidays for given locations
158
159 :param api: Webex api
160 :type api: WebexSimpleApi
161 :param locations: list of locations in which US national holidays should be observed
162 :type locations: List[Location]
163 :param year: year for national holidays. Default: current year
164 :type year: int
165 """
166 # default: this year
167 year = year or date.today().year
168
169 # get national holidays for specified year
170 holidays = CalendarifiyApi().holidays(country='US', year=year, holiday_type='national')
171
172 # update holiday schedule for each location
173 with ThreadPoolExecutor() as pool:
174 if USE_THREADING:
175 list(pool.map(
176 lambda location: observe_in_location(api=api, location=location, holidays=holidays),
177 locations))
178 else:
179 for location in locations:
180 observe_in_location(api=api, location=location, holidays=holidays)
181 return
182
183
184if __name__ == '__main__':
185 # read dotenv which has some environment variables like Webex API token and Calendarify
186 # API key.
187 load_dotenv()
188
189 # enable logging
190 logging.basicConfig(level=logging.DEBUG,
191 format='%(asctime)s %(levelname)s %(threadName)s %(name)s: %(message)s')
192 logging.getLogger('urllib3').setLevel(logging.INFO)
193 logging.getLogger('wxc_sdk.rest').setLevel(logging.INFO)
194
195 # the actual action
196 with WebexSimpleApi(concurrent_requests=5) as wx_api:
197 # get all US locations
198 log.info('Getting locations...')
199 us_locations = [location
200 for location in wx_api.locations.list()
201 if location.address.country == 'US']
202
203 # set up location locks
204 # location_locks is a defaultdict -> accessing with all potential keys creates the locks
205 list(location_locks[loc.location_id] for loc in us_locations)
206
207 # create national holiday schedule for given year(s) and locations
208 if USE_THREADING:
209 with ThreadPoolExecutor() as pool:
210 list(pool.map(
211 lambda year: observe_national_holidays(api=wx_api, year=year, locations=us_locations),
212 range(FIRST_YEAR, LAST_YEAR + 1)))
213 else:
214 for year in range(FIRST_YEAR, LAST_YEAR + 1):
215 observe_national_holidays(api=wx_api, year=year, locations=us_locations)
Holiday schedule w/ US national holidays for all US locations (async variant)¶
This example uses the Calendarific API at https://calendarific.com/ to get a list of national US holidays and creates
a “National Holidays
” holiday schedule for all US locations with all these national holidays.
A rudimentary API implementation in calendarific.py
is used for the requests to https://calendarific.com/.
Calendarific APIs require all requests to be authenticated using an API key. You can sign up for a free account to get
a free API account key which then is read from environment variable CALENDARIFIC_KEY
.
Source: us_holidays_async.py
1#!/usr/bin/env python
2"""
3Example script
4Create a holiday schedule for all US locations with all national holidays
5
6Using the asyc SDK variant
7"""
8import asyncio
9import functools
10import logging
11from collections import defaultdict
12from datetime import date
13from typing import List
14
15from dotenv import load_dotenv
16
17from calendarific import CalendarifiyApi, Holiday
18from wxc_sdk.all_types import ScheduleType, Event, Schedule
19from wxc_sdk.as_api import AsWebexSimpleApi
20from wxc_sdk.locations import Location
21
22log = logging.getLogger(__name__)
23
24# a lock per location to protect observe_in_location()
25location_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
26
27# Use parallel tasks for provisioning?
28USE_TASKS = True
29
30# True: delete holiday schedule instead of creating one
31CLEAN_UP = False
32
33# first and last year for which to create public holiday events
34FIRST_YEAR = 2022
35LAST_YEAR = 2024
36
37LAST_YEAR = not CLEAN_UP and LAST_YEAR or FIRST_YEAR
38
39
40async def observe_in_location(*, api: AsWebexSimpleApi, location: Location, holidays: List[Holiday]):
41 """
42 create/update a "National Holiday" schedule in one location
43
44 :param api: Webex api
45 :type api: WebexSimpleApi
46 :param location: location to work on
47 :type location: Location
48 :param holidays: list of holidays to observe
49 :type holidays: List[Holiday]
50 """
51 # there should always only one thread messing with the holiday schedule of a location
52 async with location_locks[location.location_id]:
53 year = holidays[0].date.year
54 schedule_name = 'National Holidays'
55
56 # shortcut
57 ats = api.telephony.schedules
58
59 # existing "National Holiday" schedule or None
60 schedule = next((schedule
61 for schedule in await ats.list(obj_id=location.location_id,
62 schedule_type=ScheduleType.holidays,
63 name=schedule_name)
64 if schedule.name == schedule_name),
65 None)
66 if CLEAN_UP:
67 if schedule:
68 log.info(f'Delete schedule {schedule.name} in location {schedule.location_name}')
69 await ats.delete_schedule(obj_id=location.location_id,
70 schedule_type=ScheduleType.holidays,
71 schedule_id=schedule.schedule_id)
72 return
73 if schedule:
74 # we need the details: list response doesn't have events
75 schedule = await ats.details(obj_id=location.location_id,
76 schedule_type=ScheduleType.holidays,
77 schedule_id=schedule.schedule_id)
78 # create list of desired schedule entries
79 # * one per holiday
80 # * only future holidays
81 # * not on a Sunday
82 today = date.today()
83 events = [Event(name=f'{holiday.name} {holiday.date.year}',
84 start_date=holiday.date,
85 end_date=holiday.date,
86 all_day_enabled=True)
87 for holiday in holidays
88 if holiday.date >= today and holiday.date.weekday() != 6]
89
90 if not schedule:
91 # create new schedule
92 log.debug(f'observe_in_location({location.name}, {year}): no existing schedule')
93 if not events:
94 log.info(f'observe_in_location({location.name}, {year}): no existing schedule, no events, done')
95 return
96 schedule = Schedule(name=schedule_name,
97 schedule_type=ScheduleType.holidays,
98 events=events)
99 log.debug(
100 f'observe_in_location({location.name}, {year}): creating schedule "{schedule_name}" with {len(events)} '
101 f'events')
102 schedule_id = await ats.create(obj_id=location.location_id, schedule=schedule)
103 log.info(f'observe_in_location({location.name}, {year}): new schedule id: {schedule_id}, done')
104 return
105
106 # update existing schedule
107 # delete existing events in the past
108 to_delete = [event
109 for event in schedule.events
110 if event.start_date < today]
111 if to_delete:
112 log.debug(f'observe_in_location({location.name}, {year}): deleting {len(to_delete)} outdated events')
113 if USE_TASKS:
114 await asyncio.gather(*[ats.event_delete(obj_id=location.location_id,
115 schedule_type=ScheduleType.holidays,
116 event_id=event.event_id)
117 for event in to_delete])
118 else:
119 for event in to_delete:
120 await ats.event_delete(obj_id=location.location_id,
121 schedule_type=ScheduleType.holidays,
122 event_id=event.event_id)
123
124 # add events which don't exist yet
125 existing_dates = set(event.start_date
126 for event in schedule.events)
127 to_add = [event
128 for event in events
129 if event.start_date not in existing_dates]
130 if not to_add:
131 log.info(f'observe_in_location({location.name}, {year}): no events to add, done.')
132 return
133 log.debug(f'observe_in_location({location.name}, {year}): creating {len(to_add)} new events.')
134 if USE_TASKS:
135 await asyncio.gather(*[ats.event_create(obj_id=location.location_id,
136 schedule_type=ScheduleType.holidays,
137 schedule_id=schedule.schedule_id,
138 event=event)
139 for event in to_add])
140 else:
141 for event in to_add:
142 await ats.event_create(obj_id=location.location_id,
143 schedule_type=ScheduleType.holidays,
144 schedule_id=schedule.schedule_id,
145 event=event)
146 log.info(f'observe_in_location({location.name}, {year}): done.')
147 return
148
149
150async def observe_national_holidays(*, api: AsWebexSimpleApi, locations: List[Location],
151 year: int = None):
152 """
153 US national holidays for given locations
154
155 :param api: Webex api
156 :type api: WebexSimpleApi
157 :param locations: list of locations in which US national holidays should be observed
158 :type locations: List[Location]
159 :param year: year for national holidays. Default: current year
160 :type year: int
161 """
162 # default: this year
163 year = year or date.today().year
164
165 # get national holidays for specified year
166 loop = asyncio.get_running_loop()
167 # avoid sync all:
168 # holidays = CalendarifiyApi().holidays(country='US', year=year, holiday_type='national')
169 holidays = await loop.run_in_executor(None, functools.partial(CalendarifiyApi().holidays,
170 country='US', year=year, holiday_type='national'))
171
172 # update holiday schedule for each location
173 if USE_TASKS:
174 await asyncio.gather(*[observe_in_location(api=api, location=location, holidays=holidays)
175 for location in locations])
176 else:
177 for location in locations:
178 await observe_in_location(api=api, location=location, holidays=holidays)
179 return
180
181
182if __name__ == '__main__':
183 # read dotenv which has some environment variables like Webex API token and Calendarify
184 # API key.
185 load_dotenv()
186
187 # enable logging
188 logging.basicConfig(level=logging.DEBUG,
189 format='%(asctime)s %(levelname)s %(threadName)s %(name)s: %(message)s')
190 logging.getLogger('urllib3').setLevel(logging.INFO)
191 logging.getLogger('wxc_sdk.as_rest').setLevel(logging.INFO)
192
193 # the actual action
194 async def do_provision():
195
196 async with AsWebexSimpleApi(concurrent_requests=5) as wx_api:
197 # get all US locations
198 log.info('Getting locations...')
199 us_locations = [location
200 for location in await wx_api.locations.list()
201 if location.address.country == 'US']
202
203 # set up location locks
204 # location_locks is a defaultdict -> accessing with all potential keys creates the locks
205 list(location_locks[loc.location_id] for loc in us_locations)
206
207 # create national holiday schedule for given year(s) and locations
208 if USE_TASKS:
209 await asyncio.gather(*[observe_national_holidays(api=wx_api, year=year, locations=us_locations)
210 for year in range(FIRST_YEAR, LAST_YEAR + 1)])
211 else:
212 for year in range(FIRST_YEAR, LAST_YEAR + 1):
213 await observe_national_holidays(api=wx_api, year=year, locations=us_locations)
214
215 asyncio.run(do_provision())
Persist tokens and obtain new tokens interactively¶
A typical problem specifically when creating CLI scripts is how to obtain valid access tokens for the API operations. If your code wants to invoke Webex REST APIs on behalf of a user then an integration is needed. The concepts of integrations are explained at the “Integrations” page on developer.cisco.com.
This example code shows how an OAUth Grant flow for an integration can be initiated from a script by using the Python
webbrowser module and calling the open()
method with the authorization URL of a given integration to open that
URL in the system web browser. The user can then authenticate and grant access to the integration. In the last
step of a successful authorization flow the web browser is redirected to the redirect_url
of the
integration.
The example code starts a primitive web server serving GET requests to http://localhost:6001/redirect
.
This URL has to be the redirect URL of the integration you create under My Webex Apps on developer.webex.com.
The sample script reads the integration parameters from environment variables (TOKEN_INTEGRATION_CLIENT_ID
,
TOKEN_INTEGRATION_CLIENT_SECRET
, TOKEN_INTEGRATION_CLIENT_SCOPES
). These variables can also be defined in
get_tokens.env
in the current directory:
# rename this to get_tokens.env and set values
# client ID for integration to be used.
TOKEN_INTEGRATION_CLIENT_ID=
# client secret of integration
TOKEN_INTEGRATION_CLIENT_SECRET=
# scopes to request. Use these scopes when creating the integration at developer.webex.com
# scopes can be in any form supported by wxc_sdk.scopes.parse_scopes()
TOKEN_INTEGRATION_CLIENT_SCOPES="spark:calls_write spark:kms spark:calls_read spark-admin:telephony_config_read spark:people_read"
The sample code persists the tokens in get_tokens.yml
in the current directory. On startup the sample code tries
to read tokens from that file. If needed a new access token is obtained using the refresh token.
An OAuth flow is only initiated if no (valid) tokens could be read from get_tokens.yml
Source: get_tokens.py
1#!/usr/bin/env python
2"""
3Example script
4read tokens from file or interactively obtain token by starting a local web server and open the authorization URL in
5the local web browser
6"""
7import json
8import logging
9import os
10from typing import Optional
11
12from dotenv import load_dotenv
13from yaml import safe_load, safe_dump
14
15from wxc_sdk import WebexSimpleApi
16from wxc_sdk.integration import Integration
17from wxc_sdk.scopes import parse_scopes
18from wxc_sdk.tokens import Tokens
19
20log = logging.getLogger(__name__)
21
22
23def env_path() -> str:
24 """
25 determine path for .env to load environment variables from
26
27 :return: .env file path
28 """
29 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.env')
30
31
32def yml_path() -> str:
33 """
34 determine path of YML file to persist tokens
35
36 :return: path to YML file
37 :rtype: str
38 """
39 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.yml')
40
41
42def build_integration() -> Integration:
43 """
44 read integration parameters from environment variables and create an integration
45
46 :return: :class:`wxc_sdk.integration.Integration` instance
47 """
48 client_id = os.getenv('TOKEN_INTEGRATION_CLIENT_ID')
49 client_secret = os.getenv('TOKEN_INTEGRATION_CLIENT_SECRET')
50 scopes = parse_scopes(os.getenv('TOKEN_INTEGRATION_CLIENT_SCOPES'))
51 redirect_url = 'http://localhost:6001/redirect'
52 if not all((client_id, client_secret, scopes)):
53 raise ValueError('failed to get integration parameters from environment')
54 return Integration(client_id=client_id, client_secret=client_secret, scopes=scopes,
55 redirect_url=redirect_url)
56
57
58def get_tokens() -> Optional[Tokens]:
59 """
60
61 Tokens are read from a YML file. If needed an OAuth flow is initiated.
62
63 :return: tokens
64 :rtype: :class:`wxc_sdk.tokens.Tokens`
65 """
66
67 def write_tokens(tokens_to_cache: Tokens):
68 with open(yml_path(), mode='w') as f:
69 safe_dump(json.loads(tokens_to_cache.json()), f)
70 return
71
72 def read_tokens() -> Optional[Tokens]:
73 try:
74 with open(yml_path(), mode='r') as f:
75 data = safe_load(f)
76 tokens_read = Tokens.parse_obj(data)
77 except Exception as e:
78 log.info(f'failed to read tokens from file: {e}')
79 tokens_read = None
80 return tokens_read
81
82 integration = build_integration()
83 tokens = integration.get_cached_tokens(read_from_cache=read_tokens,
84 write_to_cache=write_tokens)
85 return tokens
86
87
88logging.basicConfig(level=logging.DEBUG)
89
90# load environment variables from .env
91path = env_path()
92log.info(f'reading {path}')
93load_dotenv(env_path())
94
95tokens = get_tokens()
96
97# use the tokens to get identity of authenticated user
98api = WebexSimpleApi(tokens=tokens)
99me = api.people.me()
100print(f'authenticated as {me.display_name} ({me.emails[0]}) ')
Read/update call intercept settings of a user¶
usage: call_intercept.py [-h] [–token TOKEN] user_email [{on,off}]positional arguments:user_email email address of user{on,off} operation to applyoptions:-h, –help show this help message and exit–token TOKEN admin access token to use
The script uses the access token passed via the CLI, reads one from the WEBEX_ACCESS_TOKEN environment variable or
obtains tokens via an OAuth flow. For the last option the integration parameters are read from environment variables which can be set in a .env
file
Source: call_intercept.py
1#!/usr/bin/env python
2"""
3Script to read/update call intercept settings of a calling user.
4
5The script uses the access token passed via the CLI, reads one from the WEBEX_ACCESS_TOKEN environment variable or
6obtains tokens via an OAuth flow.
7
8 usage: call_intercept.py [-h] [--token TOKEN] user_email [{on,off}]
9
10 positional arguments:
11 user_email email address of user
12 {on,off} operation to apply
13
14 options:
15 -h, --help show this help message and exit
16 --token TOKEN admin access token to use
17"""
18import argparse
19import logging
20import os
21import re
22import sys
23from json import loads
24from typing import Optional
25
26from dotenv import load_dotenv
27from wxc_sdk import WebexSimpleApi
28from wxc_sdk.integration import Integration
29from wxc_sdk.person_settings.call_intercept import InterceptSetting
30from wxc_sdk.rest import RestError
31from wxc_sdk.scopes import parse_scopes
32from wxc_sdk.tokens import Tokens
33from yaml import safe_dump, safe_load
34
35log = logging.getLogger(__name__)
36
37
38def env_path() -> str:
39 """
40 determine path for .env to load environment variables from
41
42 :return: .env file path
43 """
44 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.env')
45
46
47def yml_path() -> str:
48 """
49 determine path of YML file to persist tokens
50
51 :return: path to YML file
52 :rtype: str
53 """
54 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.yml')
55
56
57def build_integration() -> Integration:
58 """
59 read integration parameters from environment variables and create an integration
60
61 :return: :class:`wxc_sdk.integration.Integration` instance
62 """
63 client_id = os.getenv('TOKEN_INTEGRATION_CLIENT_ID')
64 client_secret = os.getenv('TOKEN_INTEGRATION_CLIENT_SECRET')
65 scopes = os.getenv('TOKEN_INTEGRATION_CLIENT_SCOPES')
66 if scopes:
67 scopes = parse_scopes(scopes)
68 if not all((client_id, client_secret, scopes)):
69 raise ValueError('failed to get integration parameters from environment')
70 redirect_url = 'http://localhost:6001/redirect'
71 return Integration(client_id=client_id, client_secret=client_secret, scopes=scopes,
72 redirect_url=redirect_url)
73
74
75def get_tokens() -> Optional[Tokens]:
76 """
77
78 Tokens are read from a YML file. If needed an OAuth flow is initiated.
79
80 :return: tokens
81 :rtype: :class:`wxc_sdk.tokens.Tokens`
82 """
83
84 def write_tokens(tokens_to_cache: Tokens):
85 with open(yml_path(), mode='w') as f:
86 safe_dump(loads(tokens_to_cache.json()), f)
87 return
88
89 def read_tokens() -> Optional[Tokens]:
90 try:
91 with open(yml_path(), mode='r') as f:
92 data = safe_load(f)
93 tokens_read = Tokens.parse_obj(data)
94 except Exception as e:
95 log.info(f'failed to read tokens from file: {e}')
96 tokens_read = None
97 return tokens_read
98
99 integration = build_integration()
100 tokens = integration.get_cached_tokens(read_from_cache=read_tokens,
101 write_to_cache=write_tokens)
102 return tokens
103
104
105RE_EMAIL = re.compile(r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$")
106
107
108def email_type(value):
109 if not RE_EMAIL.match(value):
110 raise argparse.ArgumentTypeError(f"'{value}' is not a valid email")
111 return value
112
113
114def main():
115 """
116 where the magic happens
117 """
118 # read .env file with the settings for the integration to be used to obtain tokens
119 load_dotenv(env_path())
120
121 parser = argparse.ArgumentParser()
122 parser.add_argument('user_email', type=email_type, help='email address of user')
123 parser.add_argument('on_off', choices=['on', 'off'], nargs='?', help='operation to apply')
124 parser.add_argument('--token', type=str, required=False, help='admin access token to use')
125 args = parser.parse_args()
126
127 if args.token:
128 tokens = args.token
129 elif (tokens := os.getenv('WEBEX_ACCESS_TOKEN')) is None:
130 tokens = get_tokens()
131
132 if not tokens:
133 print('Failed to get tokens', file=sys.stderr)
134 exit(1)
135
136 # set level to DEBUG to see debug of REST requests
137 logging.basicConfig(level=(gt := getattr(sys, 'gettrace', None)) and gt() and logging.DEBUG or logging.INFO)
138
139 with WebexSimpleApi(tokens=tokens) as api:
140 # get user
141 email = args.user_email.lower()
142 user = next((user
143 for user in api.people.list(email=email)
144 if user.emails[0] == email), None)
145 if user is None:
146 print(f'User "{email}" not found', file=sys.stderr)
147 exit(1)
148
149 # display call intercept status
150 try:
151 intercept = api.person_settings.call_intercept.read(person_id=user.person_id)
152 except RestError as e:
153 print(f'Failed to read call intercept settings: {e.response.status_code}, {e.description}')
154 exit(1)
155
156 print('on' if intercept.enabled else 'off')
157 if args.on_off:
158 # action: turn on/off
159 intercept = InterceptSetting.default()
160 intercept.enabled = args.on_off == 'on'
161 try:
162 api.person_settings.call_intercept.configure(person_id=user.person_id,
163 intercept=intercept)
164 except RestError as e:
165 print(f'Failed to update call intercept settings: {e.response.status_code}, {e.description}')
166 exit(1)
167
168 # read call intercept again
169 try:
170 intercept = api.person_settings.call_intercept.read(person_id=user.person_id)
171 except RestError as e:
172 print(f'Failed to read call intercept settings: {e.response.status_code}, {e.description}')
173 exit(1)
174
175 # display state after update
176 print(f"set to {'on' if intercept.enabled else 'off'}")
177
178 exit(0)
179
180
181if __name__ == '__main__':
182 main()
Read/update call queue agent join states¶
usage: queue_helper.py [-h] [–location LOCATION [LOCATION …]][–queue QUEUE [QUEUE …]][–join JOIN_AGENT [JOIN_AGENT …]][–unjoin UNJOIN_AGENT [UNJOIN_AGENT …]][–remove REMOVE_USER [REMOVE_USER …]][–add ADD_USER [ADD_USER …]] [–dryrun][–token TOKEN]Modify call queue settings from the CLIoptional arguments:-h, –help show this help message and exit–location LOCATION [LOCATION …], -l LOCATION [LOCATION …]name of location to work on. If missing then work onall locations.–queue QUEUE [QUEUE …], -q QUEUE [QUEUE …]name(s) of queue(s) to operate on. If missing thenwork on all queues in location.–join JOIN_AGENT [JOIN_AGENT …], -j JOIN_AGENT [JOIN_AGENT …]Join given user(s) on given queue(s). Can be “all” toact on all agents.–unjoin UNJOIN_AGENT [UNJOIN_AGENT …], -u UNJOIN_AGENT [UNJOIN_AGENT …]Unjoin given agent(s) from given queue(s). Can be“all” to act on all agents.–remove REMOVE_USER [REMOVE_USER …], -r REMOVE_USER [REMOVE_USER …]Remove given agent from given queue(s). Can be “all”to act on all agents.–add ADD_USER [ADD_USER …], -a ADD_USER [ADD_USER …]Add given users to given queue(s).–dryrun, -d Dry run; don’t apply any changes–token TOKEN admin access token to use
The script uses the access token passed via the CLI, reads one from the WEBEX_ACCESS_TOKEN environment variable or
obtains tokens via an OAuth flow. For the last option the integration parameters are read from environment variables
which can be set in a queue_helper.env
file in the current directory.
Source: queue_helper.py
1#!/usr/bin/env python
2"""
3usage: queue_helper.py [-h] [--location LOCATION [LOCATION ...]] [--queue QUEUE [QUEUE ...]]
4 [--join JOIN_AGENT [JOIN_AGENT ...]] [--unjoin UNJOIN_AGENT [UNJOIN_AGENT ...]]
5 [--remove REMOVE_USER [REMOVE_USER ...]] [--add ADD_USER [ADD_USER ...]] [--dryrun]
6 [--token TOKEN]
7
8Modify call queue settings from the CLI
9
10optional arguments:
11 -h, --help show this help message and exit
12 --location LOCATION [LOCATION ...], -l LOCATION [LOCATION ...]
13 name of location to work on. If missing then work on all locations.
14 --queue QUEUE [QUEUE ...], -q QUEUE [QUEUE ...]
15 name(s) of queue(s) to operate on. If missing then work on all queues in location.
16 --join JOIN_AGENT [JOIN_AGENT ...], -j JOIN_AGENT [JOIN_AGENT ...]
17 Join given user(s) on given queue(s). Can be "all" to act on all agents.
18 --unjoin UNJOIN_AGENT [UNJOIN_AGENT ...], -u UNJOIN_AGENT [UNJOIN_AGENT ...]
19 Unjoin given agent(s) from given queue(s). Can be "all" to act on all agents.
20 --remove REMOVE_USER [REMOVE_USER ...], -r REMOVE_USER [REMOVE_USER ...]
21 Remove given agent from given queue(s). Can be "all" to act on all agents.
22 --add ADD_USER [ADD_USER ...], -a ADD_USER [ADD_USER ...]
23 Add given users to given queue(s).
24 --dryrun, -d Dry run; don't apply any changes
25 --token TOKEN admin access token to use
26"""
27import asyncio
28import logging
29import os
30import sys
31from argparse import ArgumentParser
32from collections.abc import AsyncGenerator
33from typing import Optional
34
35from dotenv import load_dotenv
36
37from wxc_sdk import Tokens
38from wxc_sdk.as_api import AsWebexSimpleApi
39from wxc_sdk.common import UserType
40from wxc_sdk.integration import Integration
41from wxc_sdk.people import Person
42from wxc_sdk.scopes import parse_scopes
43from wxc_sdk.telephony.callqueue import CallQueue
44from wxc_sdk.telephony.hg_and_cq import Agent
45
46
47def agent_name(agent: Agent) -> str:
48 return f'{agent.first_name} {agent.last_name}'
49
50
51def env_path() -> str:
52 """
53 determine path for .env to load environment variables from; based on name of this file
54 :return: .env file path
55 """
56 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.env')
57
58
59def yml_path() -> str:
60 """
61 determine path of YML file to persist tokens
62 :return: path to YML file
63 :rtype: str
64 """
65 return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.yml')
66
67
68def build_integration() -> Integration:
69 """
70 read integration parameters from environment variables and create an integration
71 :return: :class:`wxc_sdk.integration.Integration` instance
72 """
73 client_id = os.getenv('INTEGRATION_CLIENT_ID')
74 client_secret = os.getenv('INTEGRATION_CLIENT_SECRET')
75 scopes = parse_scopes(os.getenv('INTEGRATION_SCOPES'))
76 redirect_url = 'http://localhost:6001/redirect'
77 if not all((client_id, client_secret, scopes)):
78 raise ValueError('failed to get integration parameters from environment')
79 return Integration(client_id=client_id, client_secret=client_secret, scopes=scopes,
80 redirect_url=redirect_url)
81
82
83def get_tokens() -> Optional[Tokens]:
84 """
85 Tokens are read from a YML file. If needed an OAuth flow is initiated.
86
87 :return: tokens
88 :rtype: :class:`wxc_sdk.tokens.Tokens`
89 """
90
91 integration = build_integration()
92 tokens = integration.get_cached_tokens_from_yml(yml_path=yml_path())
93 return tokens
94
95
96async def main():
97 async def act_on_queue(queue: CallQueue):
98 """
99 Act on a single queue
100 """
101 # we need the queue details b/c the queue instance passed as parameter is from a list() call
102 # ... and thus is missing all the details like agents
103 details = await api.telephony.callqueue.details(location_id=queue.location_id, queue_id=queue.id)
104 agent_names = set(map(agent_name, details.agents))
105
106 def notify(message: str) -> str:
107 """
108 an action notification with queue information
109 """
110 return f'queue "{details.name:{queue_len}}" in "{queue.location_name:{location_len}}": {message}'
111
112 def validate_agents(names: list[str], operation: str) -> list[str]:
113 """
114 check if all names in given list exist as agents on current queue
115 """
116 if 'all' in names:
117 return set(agent_names)
118
119 not_found = [name for name in names if name not in agent_names]
120 if not_found:
121 print('\n'.join(notify(f'{name} not found for {operation}"')
122 for name in not_found),
123 file=sys.stderr)
124 return set(name for name in names if name not in set(not_found))
125
126 # validate list of names or join, unjoin, and remove against actual list of agents
127 to_join = validate_agents(join_agents, 'join')
128 to_unjoin = validate_agents(unjoin_agents, 'unjoin')
129 to_remove = validate_agents(remove_users, 'remove')
130
131 # check for agents we are asked to add but which already exist as agents on the queue
132 existing_agent_ids = set(agent.agent_id for agent in details.agents)
133 agent_exists = [agent_name(user) for user in add_users
134 if user.person_id in existing_agent_ids]
135 if agent_exists:
136 print('\n'.join(notify(f'{name} already is agent')
137 for name in agent_exists),
138 file=sys.stderr)
139 # reduced set of users to add
140 to_add = [user for user in add_users
141 if agent_name(user) not in set(agent_exists)]
142 else:
143 # ... or add all users
144 to_add = add_users
145
146 # the updated list of agents for the current queue
147 new_agents = []
148
149 # do we actually need an update?
150 update_needed = False
151
152 # create copy of each agent instance; we don't want to update the original agent objects
153 # to make sure that details still holds the state before any update
154 agents = [agent.copy(deep=True) for agent in details.agents]
155
156 # iterate through the existing agents and see if we have to apply any change
157 for agent in agents:
158 name = agent_name(agent)
159 # do we have to take action to join this agent?
160 if name in to_join and not agent.join_enabled:
161 print(notify(f'{name}, join'))
162 update_needed = True
163 agent.join_enabled = True
164 # do we have to take action to unjoin this agent?
165 if name in to_unjoin and agent.join_enabled:
166 print(notify(f'{name}, unjoin'))
167 update_needed = True
168 agent.join_enabled = False
169 # do we have to remove this agent?
170 if name in to_remove:
171 print(notify(f'{name}, remove'))
172 update_needed = True
173 # skip to next agent; so that we don't add this agent to the updated list of agents
174 continue
175 new_agents.append(agent)
176
177 # add new agents
178 new_agents.extend(Agent(agent_id=user.person_id, user_type=UserType.people)
179 for user in to_add)
180
181 # update the queue
182 if (update_needed or to_add) and not args.dryrun:
183 # simplified update: we only messed with the agents
184 update = CallQueue(agents=new_agents)
185 await api.telephony.callqueue.update(location_id=queue.location_id, queue_id=queue.id,
186 update=update)
187 print(notify('queue updated'))
188 # and get details after the update
189 details = await api.telephony.callqueue.details(location_id=queue.location_id, queue_id=queue.id)
190 print(notify('got details after update'))
191
192 # print summary
193 print(f'queue "{queue.name:{queue_len}}" in "{queue.location_name}"')
194 print(f' phone number: {details.phone_number}')
195 print(f' extension: {details.extension}')
196 print(' agents')
197 if details.agents:
198 name_len = max(map(len, map(agent_name, details.agents)))
199 for agent in details.agents:
200 print(f' {agent_name(agent):{name_len}}: {"not " if not agent.join_enabled else ""}joined')
201 return
202
203 async def validate_users(user_names: list[str]) -> AsyncGenerator[Person, None, None]:
204 """
205 Validate list of names of users to be added and yield a Person instance for each one
206 """
207 # search for all names in parallel
208 lists: list[list[Person]] = await asyncio.gather(
209 *[api.people.list(display_name=name) for name in user_names], return_exceptions=True)
210 for name, user_list in zip(user_names, lists):
211 if isinstance(user_list, Exception):
212 user = None
213 else:
214 user = next((u for u in user_list if name == agent_name(u)), None)
215 if user is None:
216 print(f'user "{name}" not found', file=sys.stderr)
217 continue
218 yield user
219 return
220
221 # parse command line
222 parser = ArgumentParser(description='Modify call queue settings from the CLI')
223 parser.add_argument('--location', '-l', type=str, required=False, nargs='+',
224 help='name of location to work on. If missing then work on all locations.')
225
226 parser.add_argument('--queue', '-q', type=str, required=False, nargs='+',
227 help='name(s) of queue(s) to operate on. If missing then work on all queues in location.')
228
229 parser.add_argument('--join', '-j', type=str, required=False, nargs='+', dest='join_agent',
230 help='Join given user(s) on given queue(s). Can be "all" to act on all agents.')
231
232 parser.add_argument('--unjoin', '-u', type=str, required=False, nargs='+', dest='unjoin_agent',
233 help='Unjoin given agent(s) from given queue(s). Can be "all" to act on all agents.')
234
235 parser.add_argument('--remove', '-r', type=str, required=False, nargs='+', dest='remove_user',
236 help='Remove given agent from given queue(s). Can be "all" to act on all agents.')
237
238 parser.add_argument('--add', '-a', type=str, required=False, nargs='+', dest='add_user',
239 help='Add given users to given queue(s).')
240 parser.add_argument('--dryrun', '-d', required=False, action='store_true',
241 help='Dry run; don\'t apply any changes')
242 parser.add_argument('--token', type=str, required=False, help='admin access token to use')
243
244 args = parser.parse_args()
245
246 # get environment variables from .env; required for integration parameters
247 load_dotenv(env_path())
248
249 tokens = args.token or None
250 if tokens is None:
251 # get tokens from cache or create a new set of tokens using the integration defined in .env
252 tokens = get_tokens()
253
254 async with AsWebexSimpleApi(tokens=tokens) as api:
255 # validate location parameter
256 location_names = args.location or []
257
258 # list of all locations with names matching one of the provided names
259 locations = [loc for loc in await api.locations.list()
260 if not location_names or loc.name in set(location_names)]
261
262 if not location_names:
263 print(f'Considering all {len(locations)} locations')
264
265 # set of names of matching locations
266 found_location_names = set(loc.name for loc in locations)
267
268 # Error message for each location name argument not matching an actual location
269 for location_name in location_names:
270 if location_name not in found_location_names:
271 print(f'location "{location_name}" not found', file=sys.stderr)
272
273 if not locations:
274 print('Found no locations to work on', file=sys.stderr)
275 exit(1)
276
277 # which queues do we need to operate on?
278 location_ids = set(loc.location_id for loc in locations)
279 queue_names = args.queue
280 all_queues = queue_names is None
281 # full list of queues
282 queues = await api.telephony.callqueue.list()
283 # filter based on location parameter
284 queues = [queue for queue in queues
285 if (all_queues or queue.name in queue_names) and queue.location_id in location_ids]
286
287 # len of queue names for nicer output
288 queue_len = max(len(queue.name) for queue in queues)
289
290 # now we can actually go back and re-evaluate the list of locations; for the location length we only need
291 # to consider locations we actually have a target queue in
292 location_ids = set(queue.location_id for queue in queues)
293
294 # max length of location names for nicely formatted output
295 location_len = max(len(loc.name)
296 for loc in locations
297 if loc.location_id in location_ids)
298
299 # get the names for join, unjoin, remove, and add
300 join_agents = args.join_agent or []
301 unjoin_agents = args.unjoin_agent or []
302 remove_users = args.remove_user or []
303 add_users = args.add_user or []
304
305 # validate users; make sure that users exist with the provided names
306 add_users = [u async for u in validate_users(user_names=add_users)]
307
308 # apply actions to all queues
309 await asyncio.gather(*[act_on_queue(queue) for queue in queues])
310
311
312if __name__ == '__main__':
313 # enable DEBUG logging to a file; REST log shows all requests
314 logging.basicConfig(filename=os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.log'),
315 filemode='w', level=logging.DEBUG)
316 asyncio.run(main())