sf_queue.module

Tracking 6.x-2.x branch
  1. drupal
    1. 6 contributions/salesforce/sf_queue/sf_queue.module

sf_queue.module Implements export queue and administrativa for SalesForce API

Functions & methods

NameDescription
sf_queue_cronImplements hook_cron
sf_queue_enqueueHelper function to add / update a queue item.
sf_queue_form_salesforce_api_settings_form_alterImplementation of hook_form_salesforce_api_settings_form_alter().
sf_queue_handle_deletesHelper function to process queue deletes. Since we can delete across object types by SFID alone, plow through 200 deletes at a time, oldest first.
sf_queue_handle_exceptionSet global state if API limit is exceeded, password is expired, creds are wrong, or if there are any other "blocker" exceptions. Otherwise, log the exception and update the queue records with failure attempts.
sf_queue_handle_responses
sf_queue_handle_upsertsHelper function to process queue updates and inserts (creates). The logic proceeds basically like this:
sf_queue_menu@file sf_queue.module Implements export queue and administrativa for SalesForce API
sf_queue_process_queue
sf_queue_process_queue_force
sf_queue_salesforce_api_deleteImplements hook_salesforce_api_delete()
sf_queue_salesforce_api_post_exportImplements hook_salesforce_api_post_export
sf_queue_salesforce_api_post_unlinkImplements hook_salesforce_api_post_unlink() Change "updates" to "inserts" and remove sfid when objects are unlinked.
sf_queue_salesforce_api_pre_exportImplements hook_salesforce_api_pre_export
_sf_queue_default_settingsHelper function for sf_queue_settings variable. Should be used for default sf_queue_settings if it is empty, e.g. variable_get('sf_queue_settings', _sf_queue_default_settings())
_sf_queue_default_stateHelper function for sf_queue_state, which keeps track of which objects have been processed, when they were last processed, etc.

File

View source
  1. <?php
  2. /**
  3. * @file sf_queue.module
  4. * Implements export queue and administrativa for SalesForce API
  5. */
  6. function sf_queue_menu() {
  7. return array(
  8. 'admin/reports/sf_queue' => array(
  9. 'title' => 'Salesforce export queue',
  10. 'description' => 'Displays the content of the Salesforce Suite Export queue.',
  11. 'type' => MENU_NORMAL_ITEM,
  12. 'page callback' => 'sf_queue_admin_form',
  13. 'access arguments' => array('administer salesforce'),
  14. 'file' => 'sf_queue.admin.inc',
  15. ),
  16. 'admin/reports/sf_queue/view' => array(
  17. 'title' => 'Salesforce export queue - View item',
  18. 'description' => 'Displays the content of a Salesforce Suite Export queue item.',
  19. 'type' => MENU_CALLBACK,
  20. 'page callback' => 'sf_queue_admin_view',
  21. 'access arguments' => array('administer salesforce'),
  22. 'file' => 'sf_queue.admin.inc',
  23. ),
  24. 'admin/reports/sf_queue/delete' => array(
  25. 'title' => 'Salesforce export queue - Delete item',
  26. 'description' => 'Deletes a Salesforce Suite Export queue item.',
  27. 'type' => MENU_CALLBACK,
  28. 'page callback' => 'drupal_get_form',
  29. 'page arguments' => array('sf_queue_admin_delete_confirm', 4),
  30. 'access arguments' => array('administer salesforce'),
  31. 'file' => 'sf_queue.admin.inc',
  32. ),
  33. 'admin/reports/sf_queue/processqueue' => array(
  34. 'title' => 'Salesforce export queue - Process queue now',
  35. 'description' => 'Processes the Salesforce Suite Export queue now.',
  36. 'type' => MENU_CALLBACK,
  37. 'page callback' => 'sf_queue_process_queue_force',
  38. 'access arguments' => array('administer salesforce'),
  39. ),
  40. );
  41. }
  42. /**
  43. * Implements hook_salesforce_api_pre_export
  44. */
  45. function sf_queue_salesforce_api_pre_export($sf_object, $map, $oid) {
  46. // If queue is not in use, return;
  47. if (!variable_get('sf_queue_enabled', FALSE)) {
  48. watchdog('sf_queue', "Salesforce module is enabled but the queue is not.");
  49. return;
  50. }
  51. $op = empty($sf_object->Id) ? 'create' : 'update';
  52. $settings = variable_get('sf_queue_settings', _sf_queue_default_settings());
  53. if (!in_array($op, $settings['cron_operations'])) {
  54. return TRUE;
  55. }
  56. return sf_queue_enqueue($op, $sf_object, $map, $oid);
  57. }
  58. /**
  59. * Implements hook_salesforce_api_post_export
  60. */
  61. function sf_queue_salesforce_api_post_export($sf_object, $map, $oid, $response) {
  62. // If queue is not in use, return;
  63. // Nothing yet.
  64. }
  65. /**
  66. * Implements hook_salesforce_api_post_unlink()
  67. * Change "updates" to "inserts" and remove sfid when objects are unlinked.
  68. */
  69. function sf_queue_salesforce_api_post_unlink($args) {
  70. $sql = "UPDATE {salesforce_export_queue} SET sf_op = 'create', sfid = '' ";
  71. $where = array(' sf_op = "update" ');
  72. $sql_args = array();
  73. if (!empty($args['oid'])) {
  74. $where[] = ' oid = %d ';
  75. $sql_args[] = $args['oid'];
  76. }
  77. if (!empty($args['sfid'])) {
  78. $where[] = ' sfid = "%s" ';
  79. $sql_args[] = $args['sfid'];
  80. }
  81. if (!empty($args['name'])) {
  82. $where[] = ' fieldmap_name = "%s" ';
  83. $sql_args[] = $args['name'];
  84. }
  85. if (!empty($args['drupal_type'])) {
  86. $where[] = ' drupal_type = "%s" ';
  87. $sql_args[] = $args['drupal_type'];
  88. }
  89. $sql .= ' WHERE ' . implode(' AND ', $where);
  90. db_query($sql, $sql_args);
  91. }
  92. /**
  93. * Implements hook_salesforce_api_delete()
  94. */
  95. function sf_queue_salesforce_api_delete($sfid, $map, $oid) {
  96. // If queue is not in use, return;
  97. if (!variable_get('sf_queue_enabled', FALSE)) {
  98. return;
  99. }
  100. $settings = variable_get('sf_queue_settings', _sf_queue_default_settings());
  101. if (!in_array('delete', $settings['cron_operations'])) {
  102. return TRUE;
  103. }
  104. return sf_queue_enqueue('delete', (object)array('Id' => $sfid), $map, $oid);
  105. }
  106. /**
  107. * Helper function to add / update a queue item.
  108. */
  109. function sf_queue_enqueue($op, $sf_object, $map, $oid) {
  110. $object = (object)array(
  111. 'sf_op' => $op,
  112. 'oid' => $oid,
  113. 'attempts' => 0,
  114. 'created' => time(),
  115. 'sfid' => empty($sf_object->Id) ? '' : $sf_object->Id, // will be blank on create
  116. 'drupal_type' => $map->drupal,
  117. 'sf_type' => $map->salesforce,
  118. 'name' => md5(microtime()),
  119. 'fieldmap_name' => $map->name,
  120. 'sf_data' => $sf_object,
  121. );
  122. $update = array();
  123. $sql =
  124. 'SELECT oid
  125. FROM {salesforce_export_queue}
  126. WHERE drupal_type = "%s" AND fieldmap_name = "%s" AND oid = %d';
  127. // If an existing operation is queued for this drupal object / fieldmap pair,
  128. // update the queue by merging the two records instead of appending to it.
  129. if ($existing = db_fetch_array(db_query($sql, $map->drupal, $map->name, $oid))) {
  130. $update = 'oid';
  131. $object->oid = $existing['oid'];
  132. }
  133. // If we successfully wrote to the queue, then we return FALSE to prevent an
  134. // immediate salesforce export. If we failed to write to the queue, log an
  135. // error but don't prevent the export.
  136. if (drupal_write_record('salesforce_export_queue', $object, $update)) {
  137. if (user_access('administer salesforce')) {
  138. drupal_set_message(t('Drupal !type queued for SalesForce export.', array('!type' => $map->drupal)));
  139. }
  140. return FALSE;
  141. }
  142. else {
  143. watchdog('sf_queue', 'Failed to queue SalesForce object. <pre>' . print_r($object, 1) . print_r($sf_object, 1) . '</pre>');
  144. return TRUE;
  145. }
  146. }
  147. /**
  148. * Implements hook_cron
  149. */
  150. function sf_queue_cron() {
  151. if (!variable_get('sf_queue_enabled', FALSE)) {
  152. return FALSE;
  153. }
  154. $settings = variable_get('sf_queue_settings', _sf_queue_default_settings());
  155. $state = variable_get('sf_queue_state', _sf_queue_default_state());
  156. $request_time = time();
  157. if (($request_time - $settings['cron_frequency']) > $state['last_attempt']) {
  158. $state['last_attempt'] = $request_time;
  159. variable_set('sf_queue_state', $state);
  160. sf_queue_process_queue($settings, $state);
  161. }
  162. }
  163. function sf_queue_process_queue_force($redir=TRUE) {
  164. if (!variable_get('sf_queue_enabled', FALSE)) {
  165. return FALSE;
  166. }
  167. $settings = variable_get('sf_queue_settings', _sf_queue_default_settings());
  168. $state = variable_get('sf_queue_state', _sf_queue_default_state());
  169. $request_time = time();
  170. $state['last_attempt'] = $request_time;
  171. variable_set('sf_queue_state', $state);
  172. sf_queue_process_queue($settings, $state);
  173. if (user_access('administer salesforce')) {
  174. drupal_set_message(t('Salesforce export queue processed.'));
  175. }
  176. if ($redir == TRUE) {
  177. drupal_goto("admin/reports/sf_queue/");
  178. }
  179. }
  180. function sf_queue_process_queue($settings, $state) {
  181. $duration = $settings['cron_period'];
  182. if ($duration > .5) {
  183. $duration = .5;
  184. }
  185. $limit = ini_get('max_execution_time') * $duration;
  186. $start_time = time();
  187. // Clean up the queue before we do any processing.
  188. db_query('DELETE FROM {salesforce_export_queue} WHERE sf_op IN ("delete", "update") AND (sfid IS NULL OR sfid = NULL OR sfid = "")');
  189. $settings = array_merge($settings, array('start_time' => $start_time, 'limit' => $limit));
  190. // Order of operations: delete, update, create
  191. if (in_array('delete', $settings['cron_operations'])) {
  192. sf_queue_handle_deletes($settings);
  193. }
  194. if (in_array('create', $settings['cron_operations'])
  195. || in_array('update', $settings['cron_operations'])) {
  196. sf_queue_handle_upserts($settings);
  197. }
  198. }
  199. /**
  200. * Helper function to process queue deletes. Since we can delete across object
  201. * types by SFID alone, plow through 200 deletes at a time, oldest first.
  202. */
  203. function sf_queue_handle_deletes($settings) {
  204. $args = array();
  205. $sql = "SELECT id, oid, fieldmap_name, drupal_type, sf_type, sfid
  206. FROM {salesforce_export_queue}
  207. WHERE sf_op = 'delete'";
  208. if ($settings['retry_num_attempts'] >= 0) {
  209. $sql .= ' AND attempts < %d';
  210. $args[] = $settings['retry_num_attempts'] + 1;
  211. }
  212. $sql .= ' ORDER BY created LIMIT ' . $settings['cron_num_objects'];
  213. $done = FALSE;
  214. $sfids = $deletes = $real_deletes = $fails = array();
  215. $result = db_query($sql, $args);
  216. while ($row = db_fetch_array($result)) {
  217. $sfids[] = $row['sfid'];
  218. $deletes[] = $row;
  219. }
  220. if (count($deletes) == 0) {
  221. return;
  222. }
  223. if ($settings['cron_min_threshold']
  224. && count($deletes) < $settings['cron_min_threshold']) {
  225. return;
  226. }
  227. try {
  228. $sf = salesforce_api_connect();
  229. $responses = $sf->client->delete($sfids);
  230. sf_queue_handle_responses($responses, $deletes);
  231. }
  232. catch (Exception $e) {
  233. sf_queue_handle_exception($e, $deletes);
  234. }
  235. // Now we need to delete the entry from salesforce_object_map
  236. }
  237. /**
  238. * Helper function to process queue updates and inserts (creates). The logic
  239. * proceeds basically like this:
  240. * - Get a list of object types that we're going to upsert by eliminating those
  241. * queue item groups that do not meet the criteria (too many fails, threshold
  242. * not met, etc.)
  243. * - For each object type:
  244. * - update up to 200 records at a time
  245. * - then, create up to 200 records at a time
  246. * - On each iteration of each major loop, break if we have exceeded our
  247. * allotted processing time limit.
  248. */
  249. function sf_queue_handle_upserts($settings) {
  250. $ph_op = db_placeholders(array_filter($settings['cron_operations']), 'text');
  251. //$count_args = $settings['cron_operations'];
  252. $count_sql =
  253. 'SELECT sf_type, count(sf_type) as total
  254. FROM {salesforce_export_queue}
  255. WHERE sf_op IN ("create", "update")';
  256. $sql = "SELECT id, oid, fieldmap_name, drupal_type, sf_type, sfid, sf_data
  257. FROM {salesforce_export_queue}
  258. WHERE sf_op = '%s' AND sf_type = '%s'";
  259. $count_having = '';
  260. if ($settings['retry_num_attempts'] >= 0) {
  261. $count_sql .= ' AND attempts < %d';
  262. $sql .= ' AND attempts < %d';
  263. $count_args[] = $settings['retry_num_attempts'] + 1;
  264. }
  265. if ($settings['cron_min_threshold']) {
  266. $count_args[] = $settings['cron_min_threshold'];
  267. $count_having = ' HAVING total >= %d';
  268. }
  269. $count_sql .= " GROUP BY sf_type " . $count_having . " ORDER BY total DESC";
  270. error_log(print_r($count_sql, 1));
  271. $count_result = db_query($count_sql, $count_args);
  272. $sql .= ' ORDER BY created LIMIT ' . $settings['cron_num_objects'];
  273. $done = FALSE;
  274. while ($settings['start_time'] + $settings['limit'] > time() && $row = db_fetch_array($count_result)) {
  275. $sf_type = $row['sf_type'];
  276. foreach (array('update', 'create') as $op) {
  277. $op_done = FALSE;
  278. $args = array($op, $sf_type, $settings['retry_num_attempts'] + 1);
  279. while ($settings['start_time'] + $settings['limit'] > time() && $op_done == FALSE) {
  280. $items = $ids = $items = $objects = array();
  281. $result = db_query($sql, $args);
  282. while ($queue_item = db_fetch_array($result)) {
  283. $ids[] = $queue_item['id'];
  284. $items[] = $queue_item;
  285. $object = unserialize($queue_item['sf_data']);
  286. if ($op == 'create') {
  287. unset($object->Id);
  288. }
  289. $objects[] = $object;
  290. }
  291. if (count($ids) > 0) {
  292. try {
  293. $sf = salesforce_api_connect();
  294. $responses = $sf->client->{$op}($objects, $sf_type);
  295. error_log(print_r($responses, 1));
  296. list($wins, $fails) = sf_queue_handle_responses($responses, $items);
  297. }
  298. catch (Exception $e) {
  299. sf_queue_handle_exception($e, $responses);
  300. continue;
  301. }
  302. error_log(print_r($wins, 1));
  303. error_log(print_r($fails, 1));
  304. if (count($fails) > 0) {
  305. watchdog('sf_queue', "Failed objects [<pre>" . print_r($objects, TRUE) . "</pre>]");
  306. }
  307. if (count($wins) == 0) {
  308. break;
  309. }
  310. // Instead of using salesforce_api_id_save to generate 2 SQL queries per
  311. // record, we collect them all to do 2 massive queries for all the
  312. // records at once.
  313. // @TODO: salesforce_api_id_save_multi
  314. $delete_sql = 'DELETE FROM {salesforce_object_map} WHERE oid IN (' . db_placeholders($wins) . ')';
  315. $insert_sql = 'INSERT INTO {salesforce_object_map} (drupal_type, oid, sfid, name, last_export) VALUES ';
  316. $row_ph = '("%s", %d, "%s", "%s", %d)';
  317. $rows = '';
  318. $args = array();
  319. //, $drupal_type, $oid, $sfid, $name);
  320. $offset = 0;
  321. foreach ($wins as $sfid => $oid) {
  322. if (!empty($rows)) {
  323. $rows .= ', ';
  324. }
  325. $rows .= $row_ph;
  326. $drupal_type = $items[$offset]['drupal_type'];
  327. if (strpos($drupal_type, 'node_') === 0) {
  328. $drupal_type = 'node';
  329. }
  330. $args[] = $drupal_type;
  331. $args[] = $oid;
  332. $args[] = $sfid;
  333. $args[] = $items[$offset]['fieldmap_name'];
  334. $args[] = time();
  335. $offset++;
  336. }
  337. $insert_sql .= ' ' . $rows;
  338. db_query($delete_sql, $wins);
  339. error_log(print_r($insert_sql, 1));
  340. error_log(print_r($args, 1));
  341. db_query($insert_sql, $args);
  342. }
  343. else {
  344. $op_done = TRUE;
  345. break;
  346. }
  347. } // while time limit for each operation
  348. } // foreach operation
  349. } // while sf_type loop
  350. }
  351. function sf_queue_handle_responses($responses, $dataset) {
  352. $wins = $fails = array();
  353. if (!is_array($responses)) {
  354. $responses = array($responses);
  355. }
  356. foreach ($responses as $key => $response) {
  357. $item = $dataset[$key];
  358. if (!empty($response->success)) {
  359. $wins[$response->id] = $item['oid'];
  360. }
  361. else {
  362. $error_str = t("Error from SalesForce:<br>Message - !message<br>Fields - !fields<br>Status code - !code", array('!message' => $response->errors->message, '!fields' => $response->errors->fields, '!code' => $response->errors->statusCode));
  363. $fails[$response->id] = $item['oid'];
  364. }
  365. }
  366. if (count($wins) > 0) {
  367. $q_str = 'DELETE FROM {salesforce_export_queue} WHERE oid IN (' . db_placeholders($wins) . ')';
  368. db_query($q_str, $wins);
  369. }
  370. if (count($fails) > 0) {
  371. $q_str = 'UPDATE {salesforce_export_queue} SET attempts = attempts + 1 WHERE oid IN (' . db_placeholders($fails) . ')';
  372. db_query($q_str, $fails);
  373. }
  374. return array($wins, $fails);
  375. }
  376. /**
  377. * Set global state if API limit is exceeded, password is expired, creds are
  378. * wrong, or if there are any other "blocker" exceptions. Otherwise, log the
  379. * exception and update the queue records with failure attempts.
  380. */
  381. function sf_queue_handle_exception($e, $dataset) {
  382. // TODO: Parse the exception and take action on it
  383. $oids = array();
  384. if (is_array($dataset)) {
  385. foreach ($dataset as $data) {
  386. $oids[] = $data['oid'];
  387. }
  388. }
  389. else {
  390. $oids[] = $dataset['oid'];
  391. }
  392. if (is_int($oid[0])) {
  393. db_query('UPDATE {salesforce_export_queue} SET attempts = attempts + 1 WHERE oid IN (' . db_placeholders($oids) . ')', $oids);
  394. }
  395. }
  396. /**
  397. * Implementation of hook_form_salesforce_api_settings_form_alter().
  398. * @see salesforce_api/salesforce_api.admin.inc::salesforce_api_settings_form
  399. */
  400. function sf_queue_form_salesforce_api_settings_form_alter(&$form, $form_state) {
  401. $enabled = variable_get('sf_queue_enabled', FALSE);
  402. $form['sf_queue'] = array(
  403. '#type' => 'fieldset',
  404. '#title' => t('Queue Settings'),
  405. '#collapsible' => TRUE,
  406. '#collapsed' => TRUE,
  407. '#weight' => -1,
  408. );
  409. if ($enabled) {
  410. $description = l('View the export queue', 'admin/reports/sf_queue');
  411. }
  412. else {
  413. $description = t('SalesForce Export Queue will attempt to optimize your SalesForce API usage by scheduling transactions for delayed processing, combining multiple object insert/updates, and backlogging failed transactions for re-processing. Enable the Export Queue to configure settings for these features.');
  414. }
  415. $form['sf_queue']['sf_queue_enabled'] = array(
  416. '#type' => 'checkbox',
  417. '#title' => t('Enable SalesForce Export Queue'),
  418. '#description' => $description,
  419. '#default_value' => $enabled,
  420. );
  421. if (!$enabled) {
  422. return;
  423. }
  424. $sf_queue_settings = variable_get('sf_queue_settings', _sf_queue_default_settings());
  425. $frequency = drupal_map_assoc(array(0, 60, 180, 300, 600, 900, 1800, 2700, 3600, 10800, 21600, 32400, 43200, 86400), 'format_interval');
  426. $frequency[0] = t('Every cron run');
  427. $fieldmaps = salesforce_api_salesforce_field_map_load_all();
  428. $objects = array();
  429. foreach ($fieldmaps as $map) {
  430. $objects[$map->salesforce] = $map->salesforce;
  431. }
  432. $num_objects = drupal_map_assoc(array(5, 10, 15, 30, 50, 75, 100, 125, 150, 175, 200));;
  433. $threshold = drupal_map_assoc(array(0, 5, 10, 15, 30, 50, 75, 100, 125, 150, 175, 200));
  434. $threshold[0] = t('No minimum');
  435. $attempts = drupal_map_assoc(array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, -1));
  436. $attempts[-1] = t('No limit');
  437. $period = array(
  438. '.05' => t('5% of cron run'),
  439. '.1' => t('10% of cron run'),
  440. '.15' => t('15% of cron run'),
  441. '.2' => t('20% of cron run'),
  442. '.25' => t('25% of cron run'),
  443. '.3' => t('30% of cron run'),
  444. '.35' => t('35% of cron run'),
  445. '.4' => t('40% of cron run'),
  446. '.45' => t('45% of cron run'),
  447. '.5' => t('50% of cron run'),
  448. );
  449. $settings = array(
  450. '#tree' => TRUE,
  451. 'cron_operations' => array(
  452. '#title' => 'Queued Operations',
  453. '#type' => 'checkboxes',
  454. '#options' => array('create' => t('Create'), 'update' => t('Update'), 'delete' => t('Delete')),
  455. '#default_value' => $sf_queue_settings['cron_operations'],
  456. '#description' => t('Which operations should be queued? Check each operation that should cause an API transaction to be queued. Unchecked operations will trigger an API transaction immediately. <strong>Check more of these options if you are running out of API calls.</strong>'),
  457. ),
  458. 'cron_frequency' => array(
  459. '#title' => 'Frequency',
  460. '#type' => 'select',
  461. '#options' => $frequency,
  462. '#default_value' => $sf_queue_settings['cron_frequency'],
  463. '#description' => t('How often should exports be attempted? Note: exports will only be attempted when cron is run. If your site-wide cron runs less frequently than this setting, the exports will be attempted on every cron run. <strong>If cron is timing out, you should lower this setting so that the queue is processed more frequently.</strong>'),
  464. ),
  465. 'cron_period' => array(
  466. '#title' => 'Time',
  467. '#type' => 'select',
  468. '#options' => $period,
  469. '#default_value' => $sf_queue_settings['cron_period'],
  470. '#description' => t('Amount of time as a percentage of allotted cron run time that should be devoted to the export queue.'),
  471. ),
  472. 'cron_num_objects' => array(
  473. '#title' => 'Number of Object Types',
  474. '#type' => 'select',
  475. '#options' => $num_objects,
  476. '#default_value' => $sf_queue_settings['cron_num_objects'],
  477. '#description' => t('How many object types should be processed per cron run? SalesForce allows up to 200 objects of the same type to be created, updated, or deleted in a single API transaction. The export queue will group objects of the same type to maximize efficient use of API transactions. This setting determines how many such object groups should be processed per cron run. <strong>If cron is timing out, you should lower this setting so that fewer API transactions are attempted per cron run.</strong>'),
  478. ),
  479. 'cron_min_threshold' => array(
  480. '#title' => 'Minimum Number of Objects to Trigger Export',
  481. '#type' => 'select',
  482. '#options' => $threshold,
  483. '#default_value' => $sf_queue_settings['cron_min_threshold'],
  484. '#description' => t('How many objects of should be required to trigger an export? The settings "No minimum" means that the export queue will be processed as fully as possible on each cron run, even if only one object is to be created, updated, or deleted. Setting this value too high may delay processing of queued data. <strong>If you are running out of API calls, try raising this setting so that more objects are queued before an API call is made. If your cron runs are timing out, try lowering this setting so that the queue load is smaller on each cron run.</strong>')
  485. ),
  486. 'retry_num_attempts' => array(
  487. '#title' => 'Number of Retry Attempts',
  488. '#type' => 'select',
  489. '#options' => $attempts,
  490. '#default_value' => $sf_queue_settings['retry_num_attempts'],
  491. '#description' => t('How many times should an API transacrion be retried after a failed attempt? "No limit" means that failed attempts will be retried either until they succeed or are manually removed from the queue. <strong>If you are running out of API calls, try lowering this setting. If your cron runs are timing out, try lowering this setting.</strong>'),
  492. ),
  493. );
  494. $form['sf_queue']['sf_queue_settings'] = $settings;
  495. }
  496. /**
  497. * Helper function for sf_queue_settings variable. Should be used for default
  498. * sf_queue_settings if it is empty, e.g.
  499. * variable_get('sf_queue_settings', _sf_queue_default_settings())
  500. */
  501. function _sf_queue_default_settings() {
  502. return array(
  503. 'cron_operations' => array('create', 'update', 'delete'),
  504. 'cron_min_threshold' => array(0),
  505. 'cron_num_objects' => 200,
  506. 'cron_frequency' => 0,
  507. 'cron_period' => '.25',
  508. 'retry_num_attempts' => 1,
  509. );
  510. }
  511. /**
  512. * Helper function for sf_queue_state, which keeps track of which objects
  513. * have been processed, when they were last processed, etc.
  514. */
  515. function _sf_queue_default_state() {
  516. return array(
  517. 'last_attempt' => NULL,
  518. );
  519. }